Window Function on AllWindowed Stream - Combining Kafka Topics

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Window Function on AllWindowed Stream - Combining Kafka Topics

G.S.Vijay Raajaa
Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 

Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Aljoscha Krettek
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha
On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 


Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

G.S.Vijay Raajaa
Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 



Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Aljoscha Krettek
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha
On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 




Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

G.S.Vijay Raajaa
Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha

On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 





Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

G.S.Vijay Raajaa

I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS 


On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha

On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 






Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Aljoscha Krettek
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit.

Best.
Aljoscha
On 4. May 2017, at 22:01, G.S.Vijay Raajaa <[hidden email]> wrote:

I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS 


On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha

On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 







Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

G.S.Vijay Raajaa
I tried the timestamp field as a string datatype as well as a Date object. Getting same error in both the cases;

Please find the POJO file:

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonAnyGetter;

import com.fasterxml.jackson.annotation.JsonAnySetter;

import com.fasterxml.jackson.annotation.JsonFormat;

import com.fasterxml.jackson.annotation.JsonIgnore;

import com.fasterxml.jackson.annotation.JsonInclude;

import com.fasterxml.jackson.annotation.JsonProperty;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import org.apache.commons.lang.builder.ToStringBuilder;


@JsonPropertyOrder({

"data",

"label",

"eventTime"

})

public class TopicPojo {


@JsonProperty("data")

private List<List<Double>> data = null;

@JsonProperty("label")

private List<String> label = null;

@JsonProperty("eventTime")

private  static  Date eventTime;


/**

* No args constructor for use in serialization

*/

public TopicPojo() {

}


/**

* @param data

* @param label

* @param eventTime

*/

public SammonsPojo(List<List<Double>> data, List<String> label, Date eventTime) {

super();

this.data = data;

this.label = label;

this.eventTime = eventTime;

}


@JsonProperty("data")

public List<List<Double>> getData() {

return data;

}


@JsonProperty("data")

public void setData(List<List<Double>> data) {

this.data = data;

}


@JsonProperty("label")

public List<String> getLabel() {

return label;

}


@JsonProperty("label")

public void setLabel(List<String> label) {

this.label = label;

}


@JsonProperty("eventTime")

public static Date getEventTime() {

return eventTime;

}


@JsonProperty("eventTime")

public void setEventTime(Date eventTime) {

this.eventTime = eventTime;

}


@Override

public String toString() {

return ToStringBuilder.reflectionToString(this);

}


}


The above code pertains to eventTime as Date object , tried them as String as well.

Regards,

Vijay Raajaa G S


On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <[hidden email]> wrote:
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit.

Best.
Aljoscha
On 4. May 2017, at 22:01, G.S.Vijay Raajaa <[hidden email]> wrote:

I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS 


On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha

On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 








Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Aljoscha Krettek
It seems that eventTime is a static field in TopicPojo and the key selector also just gets the static field via TopicPojo.getEventTime(). Why is that? Because with this the event time basically has nothing to do with the data.

On 5. May 2017, at 10:32, G.S.Vijay Raajaa <[hidden email]> wrote:

I tried the timestamp field as a string datatype as well as a Date object. Getting same error in both the cases;

Please find the POJO file:

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonAnyGetter;

import com.fasterxml.jackson.annotation.JsonAnySetter;

import com.fasterxml.jackson.annotation.JsonFormat;

import com.fasterxml.jackson.annotation.JsonIgnore;

import com.fasterxml.jackson.annotation.JsonInclude;

import com.fasterxml.jackson.annotation.JsonProperty;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import org.apache.commons.lang.builder.ToStringBuilder;


@JsonPropertyOrder({

"data",

"label",

"eventTime"

})

public class TopicPojo {


@JsonProperty("data")

private List<List<Double>> data = null;

@JsonProperty("label")

private List<String> label = null;

@JsonProperty("eventTime")

private  static  Date eventTime;


/**

* No args constructor for use in serialization

*/

public TopicPojo() {

}


/**

* @param data

* @param label

* @param eventTime

*/

public SammonsPojo(List<List<Double>> data, List<String> label, Date eventTime) {

super();

this.data = data;

this.label = label;

this.eventTime = eventTime;

}


@JsonProperty("data")

public List<List<Double>> getData() {

return data;

}


@JsonProperty("data")

public void setData(List<List<Double>> data) {

this.data = data;

}


@JsonProperty("label")

public List<String> getLabel() {

return label;

}


@JsonProperty("label")

public void setLabel(List<String> label) {

this.label = label;

}


@JsonProperty("eventTime")

public static Date getEventTime() {

return eventTime;

}


@JsonProperty("eventTime")

public void setEventTime(Date eventTime) {

this.eventTime = eventTime;

}


@Override

public String toString() {

return ToStringBuilder.reflectionToString(this);

}


}


The above code pertains to eventTime as Date object , tried them as String as well.

Regards,

Vijay Raajaa G S


On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <[hidden email]> wrote:
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit.

Best.
Aljoscha
On 4. May 2017, at 22:01, G.S.Vijay Raajaa <[hidden email]> wrote:

I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS 


On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Thanks for your input, will try to incorporate them in my implementation.

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <[hidden email]> wrote:
The approach could work, but if it can happen that an event from stream A is not matched by an event in stream B you will have lingering state that never goes away. For such cases it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

The idea is to keep events from each side in state and emit a result when you get the event from the other side. You also set a cleanup timer in case no other event arrives to make sure that state eventually goes away.

Best,
Aljoscha

On 3. May 2017, at 11:47, G.S.Vijay Raajaa <[hidden email]> wrote:

Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?

Regards,
Vijay Raajaa G S

On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha

On 2. May 2017, at 09:20, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated.

Code Snippet : 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one 

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}


I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS 









Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Meera
This post has NOT been accepted by the mailing list yet.
In reply to this post by G.S.Vijay Raajaa
Hello
I seem to be hitting this issue - I checked my keyselector and gettimestamp method on Watermark - they look correct. The task runs fine with parallel 1 , but throws the same error for > 1.

My order for processing - I create a keyed stream with keyselector on the stream and create a window operator on top of this keyed transformation.

Thanks
meera
Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Meera
In reply to this post by G.S.Vijay Raajaa
Did this problem get resolved  

- I am running into this problem when I parallelize the tasks
Unexpected key group index. This indicates a bug.

- it runs fine on 1 parallelism. This suggests there is some key grouping issue - I checked my Watermark and KeySelector - they look okay.

The snippet of my KeySelector and Watermark attached to the KeyedStream.
public class DimensionKeySelector<T extends SignalSet<?>> implements KeySelector<T, String> {

        private static final long serialVersionUID = 7666263008141606451L;
        private final String[] dimKeys;

        public DimensionKeySelector(Map<String, String> conf) {
                if (conf.containsKey("dimKeys") == false) {
                        throw new RuntimeException("Required 'dimKeys' missing.");
                }
                this.dimKeys = conf.get("dimKeys").split(",");
        }

        @Override
        public String getKey(T signalSet) throws Exception {
                StringBuffer group = new StringBuffer(signalSet.namespace());
                if (signalSet.size() != 0) {
                        for (String dim : dimKeys) {
                                if (signalSet.dimensions().containsKey(dim)) {
                                        group.append(signalSet.dimensions().get(dim));
                                }
                        }
                }
                return group.toString();
        }
}

and Watermark
public class WaterMarks extends BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> {

    public WaterMarks(Time maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }

    private static final long serialVersionUID = 1L;

    @Override
    public long extractTimestamp(MetricSignalSet element) {
        return element.get(0).timestamp().getTime();
    }
}

Any thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Aljoscha Krettek
Hi,

As a simple test, you can put your key extraction logic into a MapFunction, i.e. MapFunction<T extends SignalSet<?>, Tuple2<String, T>> and then simply use that field as the key:

input
  .map(new MyKeyExtractorMapper())
  .keyBy(0)

If that solves your problem it means that the key extraction is not deterministic. This is a problem because getKey() is called at different points in time and when the result is not always the same you will get that error.

Best,
Aljoscha

> On 12. Jun 2017, at 22:04, Meera <[hidden email]> wrote:
>
> Did this problem get resolved  
>
> - I am running into this problem when I parallelize the tasks
> Unexpected key group index. This indicates a bug.
>
> - it runs fine on 1 parallelism. This suggests there is some key grouping
> issue - I checked my Watermark and KeySelector - they look okay.
>
> The snippet of my KeySelector and Watermark attached to the KeyedStream.
> public class DimensionKeySelector<T extends SignalSet<?>> implements
> KeySelector<T, String> {
>
> private static final long serialVersionUID = 7666263008141606451L;
> private final String[] dimKeys;
>
> public DimensionKeySelector(Map<String, String> conf) {
> if (conf.containsKey("dimKeys") == false) {
> throw new RuntimeException("Required 'dimKeys' missing.");
> }
> this.dimKeys = conf.get("dimKeys").split(",");
> }
>
> @Override
> public String getKey(T signalSet) throws Exception {
> StringBuffer group = new StringBuffer(signalSet.namespace());
> if (signalSet.size() != 0) {
> for (String dim : dimKeys) {
> if (signalSet.dimensions().containsKey(dim)) {
> group.append(signalSet.dimensions().get(dim));
> }
> }
> }
> return group.toString();
> }
> }
>
> and Watermark
> public class WaterMarks extends
> BoundedOutOfOrdernessTimestampExtractor<MetricSignalSet> {
>
>    public WaterMarks(Time maxOutOfOrderness) {
>        super(maxOutOfOrderness);
>    }
>
>    private static final long serialVersionUID = 1L;
>
>    @Override
>    public long extractTimestamp(MetricSignalSet element) {
>        return element.get(0).timestamp().getTime();
>    }
> }
>
> Any thoughts?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Function-on-AllWindowed-Stream-Combining-Kafka-Topics-tp12941p13663.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Window Function on AllWindowed Stream - Combining Kafka Topics

Meera
We couldn't put the map phase in between working with stream transformation classes and it created a dangling Mapper - but  doing partitioner/tranformation with the window operator worked.


WindowOperator operator = ...

KeyGroupStreamPartitioner<MetricSignalSet, String> partitioner = new KeyGroupStreamPartitioner<MetricSignalSet, String>(new DimensionKeySelector<MetricSignalSet>(config), parallel);
        PartitionTransformation<MetricSignalSet> partitioned = new PartitionTransformation<MetricSignalSet> (inputs, partitioner) ;
        OneInputTransformation<MetricSignalSet, MetricSignalSet> trans = new OneInputTransformation<MetricSignalSet, MetricSignalSet>(
                partitioned, name, operator, ess, parallel);
        trans.setStateKeySelector(new DimensionKeySelector<MetricSignalSet>(config));
        trans.setStateKeyType(new GenericTypeInfo<String>(String.class));