Window + Reduce produces more than 1 output per window

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Window + Reduce produces more than 1 output per window

Franmoti
Hello everybody! First of all, thanks for reading :D

I am currently working on my bachelor's final project which is a
comparison between Spark Streaming and Flink. Now let's focus on the
problem:

  - THE PROBLEM: the problem is that my program is writing to Kafka more
than once every window (is creating 2-3 or more lines per window,
meanwhile it is supposed to create 1 line per window as with the reduce
function it lets only one element). I have the same code written in
Spark and it works perfectly. I have been trying to find info about this
issue and I haven't found anything :(. Also I have been trying changing
some functions' parallelism and some more things and nothing worked, and
I can not realise where can be the problem.

  - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers:
1 JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each
TaskManager).

  - INPUT DATA: lines produced by one java producer to the Kafka 24
partitions' topic with two elements: incremental value and creation
timestamp:
1 1497790546981
2 1497790546982
3 1497790546983
4 1497790546984
..................

  - MY JAVA APPLICATION:
+ It reads from a Kafka topic with 24 partitions (Kafka is in the same
machine than the JobManager).
+ The filter functions are useless together with the union as I use them
just for checking their latency.
+ Basically, it adds a "1" to each line,then there is a tumbling window
every 2 seconds, and the  reduce function sum all this 1's and all the
timestamps, this last timestamp is later divided in the map function
between the sum of 1's which gives me the average, and finally in the
last map function it adds a timestamp of the current moment to each
reduced line and the difference between this timestamp and the average
timestamp.
+ This line is written to Kafka (to a 2 partitions' topic).

######################### - CODE - ####################################

     //FLINK CONFIGURATION
     final StreamExecutionEnvironment env = StreamExecutionEnvironment
             .getExecutionEnvironment();

     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     //env.setParallelism(2);

     //KAFKA CONSUMER CONFIGURATION
     Properties properties = new Properties();
     properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
     FlinkKafkaConsumer010<String> myConsumer = new
FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);


     //KAFKA PRODUCER
     Properties producerConfig = new Properties();
     producerConfig.setProperty("bootstrap.servers",
"192.168.0.155:9092");
     producerConfig.setProperty("acks", "0");
     producerConfig.setProperty("linger.ms", "0");


     //MAIN PROGRAM
     //Read from Kafka
     DataStream<String> line = env.addSource(myConsumer);

     //Add 1 to each line
     DataStream<Tuple2<String, Integer>> line_Num = line.map(new
NumberAdder());

     //Filted Odd numbers
     DataStream<Tuple2<String, Integer>> line_Num_Odd =
line_Num.filter(new FilterOdd());

     //Filter Even numbers
     DataStream<Tuple2<String, Integer>> line_Num_Even =
line_Num.filter(new FilterEven());

     //Join Even and Odd
     DataStream<Tuple2<String, Integer>> line_Num_U =
line_Num_Odd.union(line_Num_Even);

     //Tumbling windows every 2 seconds
     AllWindowedStream<Tuple2<String, Integer>, TimeWindow>
windowedLine_Num_U = line_Num_U
             
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));

     //Reduce to one line with the sum
     DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced =
windowedLine_Num_U.reduce(new Reducer());

     //Calculate the average of the elements summed
     DataStream<String> wL_Average = wL_Num_U_Reduced.map(new
AverageCalculator());

     //Add timestamp and calculate the difference with the average
     DataStream<String> averageTS = wL_Average.map(new TimestampAdder());


     //Send the result to Kafka
     FlinkKafkaProducer010Configuration<String> myProducerConfig =
(FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
             .writeToKafkaWithTimestamps(averageTS, "testRes", new
SimpleStringSchema(), producerConfig);

     myProducerConfig.setWriteTimestampToKafka(true);

     env.execute("TimestampLongKafka");
}


//Functions used in the program implementation:

public static class FilterOdd implements FilterFunction<Tuple2<String,
Integer>> {
     private static final long serialVersionUID = 1L;

     public boolean filter(Tuple2<String, Integer> line) throws Exception
{
         Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
         return isOdd;
     }
};


public static class FilterEven implements FilterFunction<Tuple2<String,
Integer>> {
     private static final long serialVersionUID = 1L;

     public boolean filter(Tuple2<String, Integer> line) throws Exception
{
         Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
         return isEven;
     }
};


public static class NumberAdder implements MapFunction<String,
Tuple2<String, Integer>> {
     private static final long serialVersionUID = 1L;

     public Tuple2<String, Integer> map(String line) {
         Tuple2<String, Integer> newLine = new Tuple2<String,
Integer>(line, 1);
         return newLine;
     }
};


public static class Reducer implements ReduceFunction<Tuple2<String,
Integer>> {
     private static final long serialVersionUID = 1L;

     public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1,
Tuple2<String, Integer> line2) throws Exception {
         Long sum = Long.valueOf(line1._1.split(" ")[0]) +
Long.valueOf(line2._1.split(" ")[0]);
         Long sumTS = Long.valueOf(line1._1.split(" ")[1]) +
Long.valueOf(line2._1.split(" ")[1]);
         Tuple2<String, Integer> newLine = new Tuple2<String,
Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS),
                 line1._2 + line2._2);
         return newLine;
     }
};


public static class AverageCalculator implements
MapFunction<Tuple2<String, Integer>, String> {
     private static final long serialVersionUID = 1L;

     public String map(Tuple2<String, Integer> line) throws Exception {
         Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
         String result = String.valueOf(line._2) + " " +
String.valueOf(average);
         return result;
     }
};


public static final class TimestampAdder implements MapFunction<String,
String> {
     private static final long serialVersionUID = 1L;

     public String map(String line) throws Exception {
         Long currentTime = System.currentTimeMillis();
         String totalTime = String.valueOf(currentTime -
Long.valueOf(line.split(" ")[1]));
         String newLine = line.concat(" " + String.valueOf(currentTime) +
" " + totalTime);

         return newLine;
     }
};

######################### - CODE - ####################################

  - SOME OUTPUT DATA: this output has been written to the 2 partitions'
topic, and with a producing rate of less than 1000 records/second (**in
this case it is creating 3 output lines per window):

(check that every 3 lines correspond to the same 2 seconds part)
1969 1497791240910 1497791241999 1089 1497791242001 1091
1973 1497791240971 1497791241999 1028 1497791242002 1031
1970 1497791240937 1497791242094 1157 1497791242198 1261
1917 1497791242912 1497791243999 1087 1497791244051 1139
1905 1497791242971 1497791243999 1028 1497791244051 1080
1916 1497791242939 1497791244096 1157 1497791244199 1260
1994 1497791244915 1497791245999 1084 1497791246002 1087
1993 1497791244966 1497791245999 1033 1497791246004 1038
1990 1497791244939 1497791246097 1158 1497791246201 1262

Thanks again in advance!

---
FRANCISCO BORJA ROBLES MARTÍN
Escuela Técnica Superior de Ingeniería Informática
Campus Montegancedo s/n
Universidad Politécnica de Madrid (Technical University of Madrid)
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Window + Reduce produces more than 1 output per window

Piotr Nowojski
Hi,

It is difficult for me to respond fully to your question. First of all it would be really useful if you could strip down your example to a minimal version that shows a problem. Unfortunately I was unable to reproduce your issue. I was getting only one output line per window (as expected). Could you try to print output to the console (or use some different data sink) instead of writing it back to the Kafka, maybe there is a problem? Also please try remove some parts of the code bit by bit, so that you may be able to find what’s causing a problem.

As a side note I have couple of concerns with your timestamps/watermarks/windows definitions. First you specify time characteristic to an EventTime:

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

But I don’t see where you are actually setting the timestamp/watermarks. Didn’t you want to use “.assignTimestampsAndWatermarks(…)” on your input DataStream based on it’s content? Nevertheless, later you specify window by ProcessingTime:

>           .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));

Which defines the windows independent of the content of those events. Maybe switching to properly EvenTime will solve your problem?

Thanks, Piotrek

> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN <[hidden email]> wrote:
>
> Hello everybody! First of all, thanks for reading :D
>
> I am currently working on my bachelor's final project which is a comparison between Spark Streaming and Flink. Now let's focus on the problem:
>
> - THE PROBLEM: the problem is that my program is writing to Kafka more than once every window (is creating 2-3 or more lines per window, meanwhile it is supposed to create 1 line per window as with the reduce function it lets only one element). I have the same code written in Spark and it works perfectly. I have been trying to find info about this issue and I haven't found anything :(. Also I have been trying changing some functions' parallelism and some more things and nothing worked, and I can not realise where can be the problem.
>
> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
>
> - INPUT DATA: lines produced by one java producer to the Kafka 24 partitions' topic with two elements: incremental value and creation timestamp:
> 1 1497790546981
> 2 1497790546982
> 3 1497790546983
> 4 1497790546984
> ..................
>
> - MY JAVA APPLICATION:
> + It reads from a Kafka topic with 24 partitions (Kafka is in the same machine than the JobManager).
> + The filter functions are useless together with the union as I use them just for checking their latency.
> + Basically, it adds a "1" to each line,then there is a tumbling window every 2 seconds, and the  reduce function sum all this 1's and all the timestamps, this last timestamp is later divided in the map function between the sum of 1's which gives me the average, and finally in the last map function it adds a timestamp of the current moment to each reduced line and the difference between this timestamp and the average timestamp.
> + This line is written to Kafka (to a 2 partitions' topic).
>
> ######################### - CODE - ####################################
>
>    //FLINK CONFIGURATION
>    final StreamExecutionEnvironment env = StreamExecutionEnvironment
>            .getExecutionEnvironment();
>
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    //env.setParallelism(2);
>
>    //KAFKA CONSUMER CONFIGURATION
>    Properties properties = new Properties();
>    properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>    FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
>
>
>    //KAFKA PRODUCER
>    Properties producerConfig = new Properties();
>    producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>    producerConfig.setProperty("acks", "0");
>    producerConfig.setProperty("linger.ms", "0");
>
>
>    //MAIN PROGRAM
>    //Read from Kafka
>    DataStream<String> line = env.addSource(myConsumer);
>
>    //Add 1 to each line
>    DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());
>
>    //Filted Odd numbers
>    DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
>
>    //Filter Even numbers
>    DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
>
>    //Join Even and Odd
>    DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even);
>
>    //Tumbling windows every 2 seconds
>    AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U
>            .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>
>    //Reduce to one line with the sum
>    DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer());
>
>    //Calculate the average of the elements summed
>    DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator());
>
>    //Add timestamp and calculate the difference with the average
>    DataStream<String> averageTS = wL_Average.map(new TimestampAdder());
>
>
>    //Send the result to Kafka
>    FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
>            .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig);
>
>    myProducerConfig.setWriteTimestampToKafka(true);
>
>    env.execute("TimestampLongKafka");
> }
>
>
> //Functions used in the program implementation:
>
> public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
>    private static final long serialVersionUID = 1L;
>
>    public boolean filter(Tuple2<String, Integer> line) throws Exception {
>        Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
>        return isOdd;
>    }
> };
>
>
> public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
>    private static final long serialVersionUID = 1L;
>
>    public boolean filter(Tuple2<String, Integer> line) throws Exception {
>        Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
>        return isEven;
>    }
> };
>
>
> public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> {
>    private static final long serialVersionUID = 1L;
>
>    public Tuple2<String, Integer> map(String line) {
>        Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
>        return newLine;
>    }
> };
>
>
> public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
>    private static final long serialVersionUID = 1L;
>
>    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception {
>        Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]);
>        Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]);
>        Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS),
>                line1._2 + line2._2);
>        return newLine;
>    }
> };
>
>
> public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> {
>    private static final long serialVersionUID = 1L;
>
>    public String map(Tuple2<String, Integer> line) throws Exception {
>        Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
>        String result = String.valueOf(line._2) + " " + String.valueOf(average);
>        return result;
>    }
> };
>
>
> public static final class TimestampAdder implements MapFunction<String, String> {
>    private static final long serialVersionUID = 1L;
>
>    public String map(String line) throws Exception {
>        Long currentTime = System.currentTimeMillis();
>        String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1]));
>        String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime);
>
>        return newLine;
>    }
> };
>
> ######################### - CODE - ####################################
>
> - SOME OUTPUT DATA: this output has been written to the 2 partitions' topic, and with a producing rate of less than 1000 records/second (**in this case it is creating 3 output lines per window):
>
> (check that every 3 lines correspond to the same 2 seconds part)
> 1969 1497791240910 1497791241999 1089 1497791242001 1091
> 1973 1497791240971 1497791241999 1028 1497791242002 1031
> 1970 1497791240937 1497791242094 1157 1497791242198 1261
> 1917 1497791242912 1497791243999 1087 1497791244051 1139
> 1905 1497791242971 1497791243999 1028 1497791244051 1080
> 1916 1497791242939 1497791244096 1157 1497791244199 1260
> 1994 1497791244915 1497791245999 1084 1497791246002 1087
> 1993 1497791244966 1497791245999 1033 1497791246004 1038
> 1990 1497791244939 1497791246097 1158 1497791246201 1262
>
> Thanks again in advance!
>
> ---
> FRANCISCO BORJA ROBLES MARTÍN
> Escuela Técnica Superior de Ingeniería Informática
> Campus Montegancedo s/n
> Universidad Politécnica de Madrid (Technical University of Madrid)

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Window + Reduce produces more than 1 output per window

Piotr Nowojski
One more thing, please try to minimize your solution by removing this Union and Odd/Even filters at the beginning and check whether you get the same results.

Piotrek

> On Jun 19, 2017, at 2:43 PM, Piotr Nowojski <[hidden email]> wrote:
>
> Hi,
>
> It is difficult for me to respond fully to your question. First of all it would be really useful if you could strip down your example to a minimal version that shows a problem. Unfortunately I was unable to reproduce your issue. I was getting only one output line per window (as expected). Could you try to print output to the console (or use some different data sink) instead of writing it back to the Kafka, maybe there is a problem? Also please try remove some parts of the code bit by bit, so that you may be able to find what’s causing a problem.
>
> As a side note I have couple of concerns with your timestamps/watermarks/windows definitions. First you specify time characteristic to an EventTime:
>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> But I don’t see where you are actually setting the timestamp/watermarks. Didn’t you want to use “.assignTimestampsAndWatermarks(…)” on your input DataStream based on it’s content? Nevertheless, later you specify window by ProcessingTime:
>
>>          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>
> Which defines the windows independent of the content of those events. Maybe switching to properly EvenTime will solve your problem?
>
> Thanks, Piotrek
>
>> On Jun 18, 2017, at 6:12 PM, FRANCISCO BORJA ROBLES MARTIN <[hidden email]> wrote:
>>
>> Hello everybody! First of all, thanks for reading :D
>>
>> I am currently working on my bachelor's final project which is a comparison between Spark Streaming and Flink. Now let's focus on the problem:
>>
>> - THE PROBLEM: the problem is that my program is writing to Kafka more than once every window (is creating 2-3 or more lines per window, meanwhile it is supposed to create 1 line per window as with the reduce function it lets only one element). I have the same code written in Spark and it works perfectly. I have been trying to find info about this issue and I haven't found anything :(. Also I have been trying changing some functions' parallelism and some more things and nothing worked, and I can not realise where can be the problem.
>>
>> - MY CLUSTER: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).
>>
>> - INPUT DATA: lines produced by one java producer to the Kafka 24 partitions' topic with two elements: incremental value and creation timestamp:
>> 1 1497790546981
>> 2 1497790546982
>> 3 1497790546983
>> 4 1497790546984
>> ..................
>>
>> - MY JAVA APPLICATION:
>> + It reads from a Kafka topic with 24 partitions (Kafka is in the same machine than the JobManager).
>> + The filter functions are useless together with the union as I use them just for checking their latency.
>> + Basically, it adds a "1" to each line,then there is a tumbling window every 2 seconds, and the  reduce function sum all this 1's and all the timestamps, this last timestamp is later divided in the map function between the sum of 1's which gives me the average, and finally in the last map function it adds a timestamp of the current moment to each reduced line and the difference between this timestamp and the average timestamp.
>> + This line is written to Kafka (to a 2 partitions' topic).
>>
>> ######################### - CODE - ####################################
>>
>>   //FLINK CONFIGURATION
>>   final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>           .getExecutionEnvironment();
>>
>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>   //env.setParallelism(2);
>>
>>   //KAFKA CONSUMER CONFIGURATION
>>   Properties properties = new Properties();
>>   properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
>>
>>
>>   //KAFKA PRODUCER
>>   Properties producerConfig = new Properties();
>>   producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
>>   producerConfig.setProperty("acks", "0");
>>   producerConfig.setProperty("linger.ms", "0");
>>
>>
>>   //MAIN PROGRAM
>>   //Read from Kafka
>>   DataStream<String> line = env.addSource(myConsumer);
>>
>>   //Add 1 to each line
>>   DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());
>>
>>   //Filted Odd numbers
>>   DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
>>
>>   //Filter Even numbers
>>   DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
>>
>>   //Join Even and Odd
>>   DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even);
>>
>>   //Tumbling windows every 2 seconds
>>   AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U
>>           .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>>
>>   //Reduce to one line with the sum
>>   DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer());
>>
>>   //Calculate the average of the elements summed
>>   DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator());
>>
>>   //Add timestamp and calculate the difference with the average
>>   DataStream<String> averageTS = wL_Average.map(new TimestampAdder());
>>
>>
>>   //Send the result to Kafka
>>   FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
>>           .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig);
>>
>>   myProducerConfig.setWriteTimestampToKafka(true);
>>
>>   env.execute("TimestampLongKafka");
>> }
>>
>>
>> //Functions used in the program implementation:
>>
>> public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public boolean filter(Tuple2<String, Integer> line) throws Exception {
>>       Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
>>       return isOdd;
>>   }
>> };
>>
>>
>> public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public boolean filter(Tuple2<String, Integer> line) throws Exception {
>>       Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
>>       return isEven;
>>   }
>> };
>>
>>
>> public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public Tuple2<String, Integer> map(String line) {
>>       Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
>>       return newLine;
>>   }
>> };
>>
>>
>> public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception {
>>       Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]);
>>       Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]);
>>       Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS),
>>               line1._2 + line2._2);
>>       return newLine;
>>   }
>> };
>>
>>
>> public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public String map(Tuple2<String, Integer> line) throws Exception {
>>       Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
>>       String result = String.valueOf(line._2) + " " + String.valueOf(average);
>>       return result;
>>   }
>> };
>>
>>
>> public static final class TimestampAdder implements MapFunction<String, String> {
>>   private static final long serialVersionUID = 1L;
>>
>>   public String map(String line) throws Exception {
>>       Long currentTime = System.currentTimeMillis();
>>       String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1]));
>>       String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime);
>>
>>       return newLine;
>>   }
>> };
>>
>> ######################### - CODE - ####################################
>>
>> - SOME OUTPUT DATA: this output has been written to the 2 partitions' topic, and with a producing rate of less than 1000 records/second (**in this case it is creating 3 output lines per window):
>>
>> (check that every 3 lines correspond to the same 2 seconds part)
>> 1969 1497791240910 1497791241999 1089 1497791242001 1091
>> 1973 1497791240971 1497791241999 1028 1497791242002 1031
>> 1970 1497791240937 1497791242094 1157 1497791242198 1261
>> 1917 1497791242912 1497791243999 1087 1497791244051 1139
>> 1905 1497791242971 1497791243999 1028 1497791244051 1080
>> 1916 1497791242939 1497791244096 1157 1497791244199 1260
>> 1994 1497791244915 1497791245999 1084 1497791246002 1087
>> 1993 1497791244966 1497791245999 1033 1497791246004 1038
>> 1990 1497791244939 1497791246097 1158 1497791246201 1262
>>
>> Thanks again in advance!
>>
>> ---
>> FRANCISCO BORJA ROBLES MARTÍN
>> Escuela Técnica Superior de Ingeniería Informática
>> Campus Montegancedo s/n
>> Universidad Politécnica de Madrid (Technical University of Madrid)
>

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Window + Reduce produces more than 1 output per window

Franmoti
This post was updated on .
In reply to this post by Franmoti
Hello Piotrek!

Thanks for answering! Yes I have already changed the "TimeCharacteristic" to "ProcessingTime". I need it for the ".setWriteTimestampToKafka(true)" option as I use the timestamp in the Kafka consumer who reads this app's output. I have already changed the code a bit for using KeyedStreams and be able to use parallelism in the window/reduce functions.

About the problem, yesterday I noticed that the problem was growing as I did more submits, it was doing x3 outputs (with small differences in each input as you can see in my first message), but before it was doing x2 only. Finally I stopped the cluster (stop-cluster.sh) and started it again (start-cluster.sh) and the problem was solved. I have been trying to repeat the problem submitting the app several times but I haven't achieved it today. If it happens again I will try to repeat the problem with the smaller code as possible to try to find where could be the possible bug (it seems to be something wrong when submitting several times).

Kind regards!
Fran.


El 2017-06-19 14:43, Piotr Nowojski escribió:
> Hi,
>
> It is difficult for me to respond fully to your question. First of all
> it would be really useful if you could strip down your example to a
> minimal version that shows a problem. Unfortunately I was unable to
> reproduce your issue. I was getting only one output line per window
> (as expected). Could you try to print output to the console (or use
> some different data sink) instead of writing it back to the Kafka,
> maybe there is a problem? Also please try remove some parts of the
> code bit by bit, so that you may be able to find what’s causing a
> problem.
>
> As a side note I have couple of concerns with your
> timestamps/watermarks/windows definitions. First you specify time
> characteristic to an EventTime:
>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> But I don’t see where you are actually setting the
> timestamp/watermarks. Didn’t you want to use
> “.assignTimestampsAndWatermarks(…)” on your input DataStream
> based on it’s content? Nevertheless, later you specify window by
> ProcessingTime:
>
>>          
>> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>
> Which defines the windows independent of the content of those events.
> Maybe switching to properly EvenTime will solve your problem?
>
> Thanks, Piotrek
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Window + Reduce produces more than 1 output per window

Piotr Nowojski
No problem. Make sure that your application didn’t run in the background three times, thus producing 3x the expected output.

Piotrek

> On Jun 19, 2017, at 5:25 PM, FRANCISCO BORJA ROBLES MARTIN <[hidden email]> wrote:
>
> Hello Piotrek!
>
> Thanks for answering! Yes I have already changed the "TimeCharacteristic" to "ProcessingTime". I need it for the ".setWriteTimestampToKafka(true)" option as I use the timestamp in the Kafka consumer who reads this app's output. I have already changed the code a bit for using KeyedStreams and be able to use parallelism in the window/reduce functions.
>
> About the problem, yesterday I noticed that the problem was growing as I did more submits, it was doing x3 outputs (with small differences in each input as you can see in my first message), but before it was doing x2 only. Finally I stopped the cluster (stop-cluster.sh) and started it again (start-cluster.sh) and the problem was solved. I have been trying to repeat the problem submitting the app several times but I haven't achieved it today. If it happens again I will try to repeat the problem with the smaller code as possible to try to find where could be the possible bug (it seems to be something wrong when submitting several times).
>
> Kind regards!
> Fran.
>
>
> El 2017-06-19 14:43, Piotr Nowojski escribió:
>> Hi,
>> It is difficult for me to respond fully to your question. First of all
>> it would be really useful if you could strip down your example to a
>> minimal version that shows a problem. Unfortunately I was unable to
>> reproduce your issue. I was getting only one output line per window
>> (as expected). Could you try to print output to the console (or use
>> some different data sink) instead of writing it back to the Kafka,
>> maybe there is a problem? Also please try remove some parts of the
>> code bit by bit, so that you may be able to find what’s causing a
>> problem.
>> As a side note I have couple of concerns with your
>> timestamps/watermarks/windows definitions. First you specify time
>> characteristic to an EventTime:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> But I don’t see where you are actually setting the
>> timestamp/watermarks. Didn’t you want to use
>> “.assignTimestampsAndWatermarks(…)” on your input DataStream
>> based on it’s content? Nevertheless, later you specify window by
>> ProcessingTime:
>>>          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
>> Which defines the windows independent of the content of those events.
>> Maybe switching to properly EvenTime will solve your problem?
>> Thanks, Piotrek

Loading...