Queries regarding FlinkCEP

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Queries regarding FlinkCEP

Biplob Biswas
This post was updated on .
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose to create a new topic as the problem in the previous topic was solved, thanks to useful inputs from Flink Community. The questions are as follows

1. What time does the "within" operator works on "Event Time" or "Processing Time"?, I am asking this as I wanted to know whether something like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for 5 mins. So if a first events event time is at 1:00  and the corresponding next event is has an event time of 1:04 but it arrives in the system at 1:06. Would this still be processed and alert would be generated or not?

2. What would happen if I don't have a key to specify, the way 2 events are correlated is by using the ctx of the first event and matching some different id. So, we can't group by some unique field. I tried a test run without specifying a key and it apparently works. But how is the shuffling done then in this case?

3. This is one of the major issue, So I could use Event Time with ascending event time extractor for one of my kafka topic because its behavior is consistent.  But when i added another topic to read from where the events are not in ascending order, using ascending timestampextractor gave me timestamp monotonicity violation. Then when I am using BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any warnings anymore but I am no more getting my alerts.

If I go back to using processing time, then I am again getting alerts properly. What could be the problem here?

This is the code I am using:

public class CEPForBAM {


  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println(env.getStreamTimeCharacteristic());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),
            props);

    kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events


/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/

     evetns.print();
    //partitionedInput.print();

    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID = 1390448281048961616L;

              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return event.getEventName().equals(ReadEventType.class.getSimpleName());
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID = -9216505110246259082L;

              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

                if (secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName())) {
                  for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
                    if (secondEvent.getCorrelationID().contains(firstEvent.getEventId()))
                      return true;
                  }
                }
                return false;
              }
            })
            .within(Time.minutes(10));

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);


    DataStream<Either<String, String>> alerts = patternStream.select(new PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      @Override
      public String timeout(Map<String, List<BAMEvent>> map, long l) throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;
      }

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      @Override
      public String select(Map<String, List<BAMEvent>> pattern) throws Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" + bamEvent.getEventName();
      }
    });

    alerts.print();

    env.execute("CEP monitoring job");
  }
}



Even when I am using Event Time, I am getting events from kafka as can be shown from event.print()
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Till Rohrmann
Hi Biplob,

1. The CEPPatternOperator can use either processing time or event time for its internal processing logic. It only depends on what TimeCharacteristic you have set for your program. Consequently, with event time, your example should be detected as an alert.

2. If you don't provide a keyed input stream, then Flink will execute the CEP operator only with a parallelism of 1. Thus, all events pass through the same instance of the CEP operator.

3. It's hard to tell but I would assume that something with the watermark generation does not properly work. For example, it could be that you've set the out of orderness to a very large value such that it will take a long time until you can be sure that you've seen all events for a given watermark on the input without monotonically increasing timestamps. The easiest way to debug the problem would be a self-contained example program which reproduces the problem.

Cheers,
Till

On Fri, Jun 2, 2017 at 1:10 PM, Biplob Biswas <[hidden email]> wrote:
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows

*1.* What time does the "within" operator works on "Event Time" or
"Processing Time", I am asking this as I wanted to know whether something
like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for
5 mins. So if a first events event time is at 1:00  and the corresponding
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not?

*2.* What would happen if I don't have a key to specify, the way 2 events
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test run
without specifying a key and it apparently works. But how is the shuffling
done then in this case?

*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent.  But when i added another topic to read from where
the events are not in ascending order, using ascending timestampextractor
gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any
warnings anymore but I am no more getting my alerts.

If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?

*This is the code I am using:*

/public class CEPForBAM {


  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println(env.getStreamTimeCharacteristic());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),
            props);

    kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events


/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/

     evetns.print();
    //partitionedInput.print();

    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID =
1390448281048961616L;

              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return
event.getEventName().equals(ReadEventType.class.getSimpleName());
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID =
-9216505110246259082L;

              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent>
ctx) throws Exception {

                if
(secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName()))
{
                  for (BAMEvent firstEvent :
ctx.getEventsForPattern("first")) {
                    if
(secondEvent.getCorrelationID().contains(firstEvent.getEventId()))
                      return true;
                  }
                }
                return false;
              }
            })
            .within(Time.minutes(10));

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);


    DataStream<Either&lt;String, String>> alerts = patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      @Override
      public String timeout(Map<String, List&lt;BAMEvent>> map, long l)
throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;
      }

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      @Override
      public String select(Map<String, List&lt;BAMEvent>> pattern) throws
Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" +
bamEvent.getEventName();
      }
    });

    alerts.print();

    env.execute("CEP monitoring job");
  }
}/


Even when I am using Event Time, I am getting events from kafka as can be
shown from event.print()



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Dawid Wysakowicz
I think Till answered all your question but just to rephrase a bit. 

1. The within and TimeCharacteristic are working on different levels. The TimeCharacteristics tells how events are assigned a timestamp. The within operator specifies the maximal time between first and last event of a matched sequence (the time here corresponds to the chosen TimeCharacteristic). So if we have within(Time.minutes(10)) in EventTime, upon Watermark arrival the events are sorted with the assigned Timestamp and then the within is applied.

3. Looking at your code there is nothing wrong with it. As I don't know how the timestamps of your events looks like, I can just guess, but I would say either 
  • there is no matching sequences of events in your stream that fit into 10 minutes window 
  • or, your events are more mixed than across 60 seconds. Consider example: we have events with timestamps {t1=600s, t2=620, t3=550s}. Event with t3=550s cannot match with t1 because it lags 70s > 60s behind t2. FlinkCEP right now drops all late events.
For deeper understanding of Event/Processing Time I would suggest having a look at : https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#event-time


Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-02 18:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Biplob,

1. The CEPPatternOperator can use either processing time or event time for its internal processing logic. It only depends on what TimeCharacteristic you have set for your program. Consequently, with event time, your example should be detected as an alert.

2. If you don't provide a keyed input stream, then Flink will execute the CEP operator only with a parallelism of 1. Thus, all events pass through the same instance of the CEP operator.

3. It's hard to tell but I would assume that something with the watermark generation does not properly work. For example, it could be that you've set the out of orderness to a very large value such that it will take a long time until you can be sure that you've seen all events for a given watermark on the input without monotonically increasing timestamps. The easiest way to debug the problem would be a self-contained example program which reproduces the problem.

Cheers,
Till

On Fri, Jun 2, 2017 at 1:10 PM, Biplob Biswas <[hidden email]> wrote:
Hi ,

Thanks a lot for the help last time, I have a few more questions and I chose
to create a new topic as the problem in the previous topic was solved,
thanks to useful inputs from Flink Community. The questions are as follows

*1.* What time does the "within" operator works on "Event Time" or
"Processing Time", I am asking this as I wanted to know whether something
like the following would be captured or not.

MaxOutofOrderness is set to 10 mins, and "within" operator is specified for
5 mins. So if a first events event time is at 1:00  and the corresponding
next event is has an event time of 1:04 but it arrives in the system at
1:06. Would this still be processed and alert would be generated or not?

*2.* What would happen if I don't have a key to specify, the way 2 events
are correlated is by using the ctx of the first event and matching some
different id. So, we can't group by some unique field. I tried a test run
without specifying a key and it apparently works. But how is the shuffling
done then in this case?

*3.* This is one of the major issue, So I could use Event Time with
ascending event time extractor for one of my kafka topic because its
behavior is consistent.  But when i added another topic to read from where
the events are not in ascending order, using ascending timestampextractor
gave me timestamp monotonicity violation. Then when I am using
BoundedOutOfOrdernessTimestampExtractor for the same, I am not getting any
warnings anymore but I am no more getting my alerts.

If I go back to using processing time, then I am again getting alerts
properly. What could be the problem here?

*This is the code I am using:*

/public class CEPForBAM {


  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println(env.getStreamTimeCharacteristic());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10000);

// configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
            Arrays.asList("topic1", "topic_x", "topic_test"),
            new StringSerializerToEvent(),
            props);

    kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);

    // Input stream of monitoring events


/*    DataStream<BAMEvent> partitionedInput = events
            .keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);*/

     evetns.print();
    //partitionedInput.print();

    Pattern<BAMEvent, ?> pattern = Pattern.<BAMEvent>begin("first")
            .where(new SimpleCondition<BAMEvent>() {
              private static final long serialVersionUID =
1390448281048961616L;

              @Override
              public boolean filter(BAMEvent event) throws Exception {
                return
event.getEventName().equals(ReadEventType.class.getSimpleName());
              }
            })
            .followedBy("second")
            .where(new IterativeCondition<BAMEvent>() {
              private static final long serialVersionUID =
-9216505110246259082L;

              @Override
              public boolean filter(BAMEvent secondEvent, Context<BAMEvent>
ctx) throws Exception {

                if
(secondEvent.getEventName().equals(StatusChangedEventType.class.getSimpleName()))
{
                  for (BAMEvent firstEvent :
ctx.getEventsForPattern("first")) {
                    if
(secondEvent.getCorrelationID().contains(firstEvent.getEventId()))
                      return true;
                  }
                }
                return false;
              }
            })
            .within(Time.minutes(10));

    PatternStream<BAMEvent> patternStream = CEP.pattern(events, pattern);


    DataStream<Either&lt;String, String>> alerts = patternStream.select(new
PatternTimeoutFunction<BAMEvent, String>() {
      private static final long serialVersionUID = -8717561187522704500L;

      @Override
      public String timeout(Map<String, List&lt;BAMEvent>> map, long l)
throws Exception {
        return "TimedOut: " + map.toString() + " @ " + l;
      }

    }, new PatternSelectFunction<BAMEvent, String>() {
      private static final long serialVersionUID = 3144439966791408980L;

      @Override
      public String select(Map<String, List&lt;BAMEvent>> pattern) throws
Exception {
        BAMEvent bamEvent = pattern.get("first").get(0);
        return "Matched Events: " + bamEvent.getEventId() + "_" +
bamEvent.getEventName();
      }
    });

    alerts.print();

    env.execute("CEP monitoring job");
  }
}/


Even when I am using Event Time, I am getting events from kafka as can be
shown from event.print()



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Thanks a lot, Till and Dawid for such detailed reply.

I tried to check and wait what both of you suggested and I still have no events. Thus as pointed out by till, I created a self-contained example to reproduce the issue and the behaviour is the same as was in my original case.

Please find the self-contained example below:

https://gist.github.com/revolutionisme/c3878a6420b322176ac686cbf1a8ac43

Also the input to the kafka topic "test" for which it doesn't work is as follows:

https://gist.github.com/revolutionisme/3651acf1baa8cf3696019bc92959e1a2

I used only one topic for reproducing the problem although the behaviour is the same when reading from more than one topic.

Maybe its a bug, maybe I did something really stupid here. But any help would be really appreciated.

Thanks a lot,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Also, my test environment was Flink 1.4-Snapshot with Kafka 0.10.0 on HDP 2.5.

And I sent my test messages via the console producer.

Thanks,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Sorry to bombard with so many messages , but one last thing is the example would produce alert if the line specifying Event Time is commented out.

More specifically, this one:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


Only with event time, there is no alert.

Thanks, Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Dawid Wysakowicz
Hi Biplop,

CEP library internally must ensure order of the incoming events. It sorts events upon Watermark arrival. At that time it sorts events with timestamp < Watermark.

With BoundedOutOfOrdernessTimestampExtractor a Watermark with time t is generated if there arrives event with timestamp t + maxOutOfOrderness.

Try adding event like: 12,b,70000,6 to your test set and some alerts should be generated.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-06 15:17 GMT+02:00 Biplob Biswas <[hidden email]>:
Sorry to bombard with so many messages , but one last thing is the example
would produce alert if the line specifying Event Time is commented out.

More specifically, this one:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


Only with event time, there is no alert.

Thanks, Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454p13513.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Hi Dawid,

What you wrote is exactly correct, it wouldn't generate a new waatermark (and subsequently throw events) unless maxOutOfOrderness time is elapsed. Thus, I was expecting for alerts to be raised as the stream was out of order but not out of maxOutOfOrderness.

Nevertheless I tried your example and the result is the same, I get nothing as output except the events themselves which I see with events.print()

PS: BTW, shouldn't your example generate nothing because its beyond the maxOutOfOrderness and Flink would simply discard these messages.

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Dawid Wysakowicz
Sorry I have not responded earlier. Did you try input like this?:
1,a,1,0
5,a,3,2
6,a,2,1
8,b,5,1
12,b,4,6
12,b,70000,6
In response to your PS, in this example no events are late(in regards to maxOutOfOrderness). If after the last event there was an event like (12,b, 9999, 6) it would be discarded, because 9999 < 70000 - 60000(=60s).


Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-06 16:17 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi Dawid,

What you wrote is exactly correct, it wouldn't generate a new waatermark
(and subsequently throw events) unless maxOutOfOrderness time is elapsed.
Thus, I was expecting for alerts to be raised as the stream was out of order
but not out of maxOutOfOrderness.

Nevertheless I tried your example and the result is the same, I get nothing
as output except the events themselves which I see with events.print()

PS: BTW, shouldn't your example generate nothing because its beyond the
maxOutOfOrderness and Flink would simply discard these messages.

Regards,
Biplob




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454p13523.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Hi Dawid,


Yes, now I understood what you meant. Although I added exactly the input you asked me to and I still get no alerts.

I also observed that I am not getting alerts even with normal ordering of timestamp and with ascedingTimestampExtractor.

I am adding an image where I entered the data from the console producer and the console should printout the alerts along with the events, but only the events are printed.


With unordered events Ordered Events


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Hi,

Can anyone check, whether they can reproduce this issue on their end? There's no log yet as t what is happening. Any idea to debug this issue is well appreciated.

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Dawid Wysakowicz
Hi Biplop,

Your recent post on reading from different topics made me realise it may be a problem with "stalled" partitions. Did your topic have more than one partition? If it did, it may be the problem that Watermark is generated independently per partition and then the smallest one is taken as a "global" Watermark.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-08 15:35 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi,

Can anyone check, whether they can reproduce this issue on their end?
There's no log yet as t what is happening. Any idea to debug this issue is
well appreciated.

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454p13591.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Biplob Biswas
Hi dawid,

Yes I am reading from multiple topics and yes a few topics have multiple partitions, not all of them.

But I didn't understand the concept of stalled partition.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queries regarding FlinkCEP

Dawid Wysakowicz
For example if there is no events in one of the partitions the partition will always generate Watermark(Long.MIN_VALUE) which will result in Watermark not being advanced. There is open JIRA to improve such situations: FLINK-5479.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-20 14:00 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi dawid,

Yes I am reading from multiple topics and yes a few topics have multiple
partitions, not all of them.

But I didn't understand the concept of stalled partition.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Queries-regarding-FlinkCEP-tp13454p13853.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Loading...