Flink CEP not emitting timed out events properly

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Flink CEP not emitting timed out events properly

Biplob Biswas
Hi,

I am having some issues with FlinkCEP again. This time I am using processing time for my CEP job where I am reading from multiple kafka topics and using the pattern API to create a rule. I am outputting both, the matched events as well as timeout events.

Now my problem is, I am sending some event over one of the topics such that subsequent events wouldn't be generated within the time specified and I expect a timed out event.

But it is not generating the timed out event even after 2 minutes (specified interval) and it's only generating the previous timed out events when I am sending an extra message over the kafka topic.

I am not sure why is this happening, for example:

2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []
2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []
2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da5 []
2> Anomaly Events: {first=[RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []]} @ 1497612386342
2> Anomaly Events: {first=[RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []]} @ 1497612386342

in the example above, the anomaly events are generated only after sending the event with event id - 044023a4-edec-439c-b221-806740972da5

and that too the anomaly event for this particular event is not generated.

I suspected that the watermark was not updated automatically for the last event and it's only updated when a new event comes in the system. So, I added the setAutoWatermarkInterval(1000) to the code but no avail.


Thanks & Regards,
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Kostas Kloudas
Hi Biplob,

With processing time there are no watermarks in the stream.
The problem that you are seeing is because in processing time, the CEP
library expects the “next” element to come, in order to investigate if
some of the patterns have timed-out.

Kostas

> On Jun 16, 2017, at 1:29 PM, Biplob Biswas <[hidden email]> wrote:
>
> Hi,
>
> I am having some issues with FlinkCEP again. This time I am using processing
> time for my CEP job where I am reading from multiple kafka topics and using
> the pattern API to create a rule. I am outputting both, the matched events
> as well as timeout events.
>
> Now my problem is, I am sending some event over one of the topics such that
> subsequent events wouldn't be generated within the time specified and I
> expect a timed out event.
>
> But it is not generating the timed out event even after 2 minutes (specified
> interval) and it's only generating the previous timed out events when I am
> sending an extra message over the kafka topic.
>
> I am not sure why is this happening, for example:
>
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da5 []
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da2 []]} @ 1497612386342
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da1 []]} @ 1497612386342
>
> in the example above, the anomaly events are generated only after sending
> the event with event id - 044023a4-edec-439c-b221-806740972da5
>
> and that too the anomaly event for this particular event is not generated.
>
> I suspected that the watermark was not updated automatically for the last
> event and it's only updated when a new event comes in the system. So, I
> added the setAutoWatermarkInterval(1000) to the code but no avail.
>
>
> Thanks & Regards,
> Biplob Biswas
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi Kostas,

Thanks for the reply, makes things a bit more clear.

Also, I went through this link and it is something similar I am trying to observe.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-td9371.html

I am checking for timed out events and when I am using event time, its behaviour is non-deterministic. For one pattern it's generating a few 'matched events' and for a different pattern no 'matched events'. And almost no timedout events in any of the patterns unless I run the series of mock events a second time, during which I get a series of anomaly events.

I had a topic created with this issue but I didn't get any satisfactory solutions there, so was testing it with processing time whether it works even or not.

https://gist.github.com/revolutionisme/cf675ceee1492b93be020d4526bc9d38
https://gist.github.com/revolutionisme/38578e631f7a15f02cb2488f9fe56c76

I would really like to know how to increment the watermark without any events coming in, such that at least the timedout events are emitted by the system.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Kostas Kloudas
Hi Biplob,

If  you know what you want, you can always write your custom
AssignerWithPeriodicWatermarks that does your job. If you want
to just increase the watermark, you could simply check if you have
received any elements and if not, emit a watermark with the timestamp
of the previous watermark + X.

Kostas

> On Jun 16, 2017, at 3:28 PM, Biplob Biswas <[hidden email]> wrote:
>
> Hi Kostas,
>
> Thanks for the reply, makes things a bit more clear.
>
> Also, I went through this link and it is something similar I am trying to
> observe.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-td9371.html
>
> I am checking for timed out events and when I am using event time, its
> behaviour is non-deterministic. For one pattern it's generating a few
> 'matched events' and for a different pattern no 'matched events'. And almost
> no timedout events in any of the patterns unless I run the series of mock
> events a second time, during which I get a series of anomaly events.
>
> I had a topic created with this issue but I didn't get any satisfactory
> solutions there, so was testing it with processing time whether it works
> even or not.
>
> https://gist.github.com/revolutionisme/cf675ceee1492b93be020d4526bc9d38
> https://gist.github.com/revolutionisme/38578e631f7a15f02cb2488f9fe56c76
>
> I would really like to know how to increment the watermark without any
> events coming in, such that at least the timedout events are emitted by the
> system.
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13800.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi Kostas,

Thanks for that suggestion, I would try that next, I have out of order events on one of my Kafka topics and that's why I am using BoundedOutOfOrdernessTimestampExtractor(), now that this doesn't work as expected I would try to work with the Base class as you suggested.

Although this behaviour is not at all consistent and not what I was expecting, will update with the results which I get from my experiments.

Thanks again,
Biplob

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
In reply to this post by Kostas Kloudas
Hi Kostas,

Implementing my custom timestamp assigned made em realise a problem which we have in our architecture you may say.

Any inputs would be really appreciated.

So, for now, we are reading from 4 different kafka topics, and we have a flow similar to something like this:

Event 1(on topic t1) -generates-> Event 2(on topic t2)
Event 1(on topic t1) -generates-> Event 3(on topic t3) -generates-> Event 4 (on topic t4)

As I am reading from all these topics, there is an overlap of the various events with the various timestamp, so when I am printing the list of events, I can see events with higher timestamps as well which I assumed is affecting the watermarks.

But when I added logs to the extract timestamp method, I can see that there are multiple different watermarks which are maintained.

Could you or anyone else help me in understanding, how watermarks work in case of reading from different topics simultaneously?

I am pretty sure that my lost events are due to the issues with the way watermarking works on events from various topics simultaneously.

Thanks & Regards
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Dawid Wysakowicz
Hi Biplop,

If an operator has two inputs, the resulting Watermark is the smallest one from the two upstreams. More on that you can check here.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi dawid,

First of all congratulations on being a Flink committer, saw your tweet in the morning.

Now regarding that link, that talks about multiple partitions for a single topic, here I am talking about multiple topics each having different number of partitions.

I tried adding tinestampextractor at the kafkasource, but
I still observe different watermarks when I am logging the current watermark timestamp.

If I expect the same behaviour shouldnt there be only one value for a watermark?

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Dawid Wysakowicz
It is possible that two Watermarks will be passed through the Stream, but at an operator that has more than one input streams(in your case from different topics and partitions) the smallest Watermark is considered valid and passed downstream.

Z pozdrowieniami! / Cheers!


Dawid Wysakowicz

Data/Software Engineer

Skype: dawid_wys | Twitter: @OneMoreCoder


2017-06-20 13:50 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi dawid,

First of all congratulations on being a Flink committer, saw your tweet in
the morning.

Now regarding that link, that talks about multiple partitions for a single
topic, here I am talking about multiple topics each having different number
of partitions.

I tried adding tinestampextractor at the kafkasource, but
I still observe different watermarks when I am logging the current watermark
timestamp.

If I expect the same behaviour shouldnt there be only one value for a
watermark?

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13850.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
But if that's the case, I don't understand why some of my events are just lost .... If the watermark which is used is the smallest ... They either I expect a match or I expect a timed out event.

The only way I can imagine my events getting lost is higher watermark than the incoming event and thus that event is discarded as too late.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Kostas Kloudas
Hi Biplob,

You are correct that only a higher watermark leads to discarded events.
Are you sure that your custom watermark emitter does not emit a high watermark?
E.g. your partition has elements that are far out-of-order.

In addition, are you sure that your elements are not simply buffered and waiting
for the right watermark?

Kostas

> On Jun 20, 2017, at 2:03 PM, Biplob Biswas <[hidden email]> wrote:
>
> But if that's the case, I don't understand why some of my events are just
> lost .... If the watermark which is used is the smallest ... They either I
> expect a match or I expect a timed out event.
>
> The only way I can imagine my events getting lost is higher watermark than
> the incoming event and thus that event is discarded as too late.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13854.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi Kostas,

I have out-of-orderness of around 5 seconds from what I have observed but that too from events coming from a different topic. The initial topic doesn't have out-of-order events still I have added a generous time bound of 20 seconds. Still, I will try for a higher number just in order to check a bit more.

The second problem you suggested sounds more interesting because when I print my events which have been ingested I see all the events. It's just that those events are neither generating a match nor are generating an anomaly which felt a bit weird for me.

In what cases the elements could be buffered and waiting forever? I expected that even if they are buffered when the timeout happens I would get all the events which were waiting for a match, provided the watermark is reset and that I am doing by setting the autoWatermarkinterval and increasing the watermark when there are no new events and after every 10-second system time.

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Kostas Kloudas
You are correct that elements are waiting until a watermark with a higher timestamp
than theirs (or the patterns timeout) arrives.

Now for the Watermark emitter, 1) how do you measure the 10sec in processing time
and ii) by how much do you advance the watermark. If you advance it by a lot, then
the elements that may come later, they may be considered late. \

For the first question I suppose that you set the watermark interval to 10 sec and
if you see that there were no elements in between, you consider it inactivity right?
How do you estimate that there were no elements? You have a flag in the emitter?

> On Jun 20, 2017, at 3:57 PM, Biplob Biswas <[hidden email]> wrote:
>
> Hi Kostas,
>
> I have out-of-orderness of around 5 seconds from what I have observed but
> that too from events coming from a different topic. The initial topic
> doesn't have out-of-order events still I have added a generous time bound of
> 20 seconds. Still, I will try for a higher number just in order to check a
> bit more.
>
> The second problem you suggested sounds more interesting because when I
> print my events which have been ingested I see all the events. It's just
> that those events are neither generating a match nor are generating an
> anomaly which felt a bit weird for me.
>
> In what cases the elements could be buffered and waiting forever? I expected
> that even if they are buffered when the timeout happens I would get all the
> events which were waiting for a match, provided the watermark is reset and
> that I am doing by setting the autoWatermarkinterval and increasing the
> watermark when there are no new events and after every 10-second system
> time.
>
> Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13857.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi Kostas,

Yes, I have a flag in my timestampextractor.

As you can see from the code below, I am checking whether
currentTime - systemTimeSinceLastModification > 10 sec..... as new events come then the watermark wouldn't be incremented. But as soon as I have a difference of more than 10 seconds, I am incrementing the watermark by 1 sec, I feel this is very small and I would try incrementing the watermark with a higher value but yeah this is what I am doing.


public class TimestampAndWatermarkGenerator implements AssignerWithPeriodicWatermarks<BAMEvent>{

  private final long maxOutOfOrderness = 10000; // 10 seconds

  private long currentMaxTimestamp;
  private long systemTimeSinceLastModification;
  private boolean firstEventFlag = false;
  private Logger log = LoggerFactory.getLogger(TimestampAndWatermarkGenerator.class);

  @Nullable
  @Override
  public Watermark getCurrentWatermark() {
    long currentTime = System.currentTimeMillis();
    if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 10000)){
      systemTimeSinceLastModification = currentTime;
      currentMaxTimestamp = currentMaxTimestamp + 1000;
      //log.info("Current Max Time - {}, Last Modification Time - {}", currentMaxTimestamp, systemTimeSinceLastModification );
    }
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }

  @Override
  public long extractTimestamp(BAMEvent bamEvent, long l) {
    long timestamp = bamEvent.getTimestamp();
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    systemTimeSinceLastModification = System.currentTimeMillis();
    firstEventFlag = true;
    //log.info("Current Max Time - {}, Current Event Time - {}", currentMaxTimestamp, systemTimeSinceLastModification);

    return timestamp;
  }
}
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Kostas Kloudas
Are you sure that after incrementing the wm by 1sec, there is no 
element that will come with a timestamp smaller than this? Or,
that after 10sec of inactivity, no element will come with such a 
timestamp?

Kostas

On Jun 20, 2017, at 4:18 PM, Biplob Biswas <[hidden email]> wrote:

 currentMaxTimestamp = currentMaxTimestamp + 1000;

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
I know that there wouldn't be a scenario where the first event type(coming from topic t1) would be coming with a timestamp higher than the current watermark. Although I am still investigating whether the other events from other topics (specifically t3 and t4) are arriving after the watermark update process.
But in that case, I at least expect timeout rather than matches. I am not getting the number of events equal to the number of events ingested from t1 (it's the first event in my CEP engine, so it starts the pattern).

I would try with a huge delay to increment the watermark and that too in smaller intervals, Hopefully, I would get the desired results.

Would update whatever I get by tomorrow.

Thanks a lot for the assistance Kostas. :)

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink CEP not emitting timed out events properly

Biplob Biswas
Hi Kostas,

I ended up setting my
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
to
currentMaxTimestamp = Math.min(timestamp, currentMaxTimestamp);

and  changing this :
if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 10000)){
      systemTimeSinceLastModification = currentTime;
      currentMaxTimestamp = currentMaxTimestamp + 1000;
      //log.info("Current Max Time - {}, Last Modification Time - {}", currentMaxTimestamp, systemTimeSinceLastModification );
    }

to

if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 20000)){
      systemTimeSinceLastModification = currentTime;
      currentMaxTimestamp = currentMaxTimestamp + 10000;
    }

it is working fine now such that I have all the events ... with timeout and with matches. Although I am afraid, this might not be the best way to do things(I am still investigating what I can do and change) because this change from max to min can lead to changes in the watermark which is not just ascending but descending as well (that's what I think can happen when an event with lower timestamp than the current watermark arrives) .... and from whatever I have read so far, watermarks should always move forward.... I haven't had any such behaviour so far but if it happens what should I expect? My job blowing up or some undefined behaviour?

Any inputs would be helpful.

BR,
Biplob
Loading...