Filter events based on future events

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Filter events based on future events

Theo Diefenthal
Hi there, 

I have the following use case:

I get transaction logs from multiple servers. Each server puts its logs into its own Kafka partition so that within each partition the elements are monothonically ordered by time. 

Within the stream of transactions, we have some special events. Let's call them A. (roughly 1-10% in distribution have this type). 

An A event can have an Anti-A event later on in time. That is an event which has all the same attributes (like username, faculty,..) but differs in one boolean attribute indicating that it is an anti event. Kind of a retraction. 

Now I want to emit almost all events downstream (including neither A nor Anti-A, let's call them simpy B), preserving the monothonical order of events. There is just one special case in which I want to filter out an element: If the stream has an A event followed by an Anti-A event within one minute time, only the Anti-A event shall go downstream, not A itself. But if there is no Anti-A event, A shall be emitted and shall still be within timestamp order of events. 

I'm wrangling my head around it a lot and don't come up with a proper (performant) solution. It seems to be obvious that in the end, I need to buffer all records over 1 minute so that order can be preserved. But I have no idea how to implement this in Flink efficiently. 

My thoughts thus far:

1. I could give CEP a try. But in that CEP I would need to write something like match all B events in any case. And match A also but only if there is no anti A => doesn`t that produce a lot of state? And are all B events considered in the breadth first rule match approach, I. E. Tons of unnecessary comparisons against A? Any pseudo code on how I could do this with CEP? 

2. If I key data by partition and all other attributes except for the retract boolean so that A and anti A always fall into the same keyed stream but no other event in that stream, I probably get much better comparison capabilities. But how much overhead do I produce with it? Will Flink reshuffle the data even if the first key stays the same? And can I backpartiton to my "global" per partition order? Note that some events have the exact event time timestamp but I still want to have them in their original order later on. 

3. Could I work with session windows somehow? Putting A and Anti A in the same session and in window emit I would just not collect the A event if there is an Anti A? Would it be more or less overhead compared to CEP?

4. Do you have any other idea on how to approach this? Sadly, I have no way to manipulate the input stream, so that part of the pipeline is fixed.

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

Re: Filter events based on future events

Fabian Hueske-2
Hi Theo,

I would implement this with a KeyedProcessFunction.
These are the important points to consider:

1) partition the output of the Kafka source by Kafka partition (or the attribute that determines the partition). This will ensure that the data stay in order (per partition).
2) The KeyedProcessFunction needs state to buffer the data of one minute. It depends on the amount of data that you expect to buffer which state is the most efficient. If you expect that one minute can be easily hold in memory, I'd use a FS state backend which keeps all state on the JVM heap. You could use a ValueState with an appropriate data structure (Queue, PrioQueue, ...). The data structure would be held as regular Java object on the heap and hence provide efficient access. If you expect the one minute to be too much data to be held in memory, you need to go for the RocksDB state backend. Since this causes de/serialization with every read and write access, it's more difficult to identify an efficient state primitive / access pattern. I won't go into the details here, assuming that the buffered data fits into memory and you can go for the FS state backend. If that's not the case, let me know and I can share some tips on the RocksDB state backend approach. The KeyedProcessFunction would add records to the buffer state when processElement() is called and emit all buffered records that have a timestamp of less than the timestamp of the currently added record - 1 minute.

Note, since the timestamps are monotonically increasing, we do not need watermarks and event-time but can rely on the timestamps of the records. Hence, the application won't block if one partition stalls providing the same benefits that per-key watermarks would offer (if they were supported by Flink).

Best, Fabian

Am Di., 10. Sept. 2019 um 23:06 Uhr schrieb [hidden email] <[hidden email]>:
Hi there, 

I have the following use case:

I get transaction logs from multiple servers. Each server puts its logs into its own Kafka partition so that within each partition the elements are monothonically ordered by time. 

Within the stream of transactions, we have some special events. Let's call them A. (roughly 1-10% in distribution have this type). 

An A event can have an Anti-A event later on in time. That is an event which has all the same attributes (like username, faculty,..) but differs in one boolean attribute indicating that it is an anti event. Kind of a retraction. 

Now I want to emit almost all events downstream (including neither A nor Anti-A, let's call them simpy B), preserving the monothonical order of events. There is just one special case in which I want to filter out an element: If the stream has an A event followed by an Anti-A event within one minute time, only the Anti-A event shall go downstream, not A itself. But if there is no Anti-A event, A shall be emitted and shall still be within timestamp order of events. 

I'm wrangling my head around it a lot and don't come up with a proper (performant) solution. It seems to be obvious that in the end, I need to buffer all records over 1 minute so that order can be preserved. But I have no idea how to implement this in Flink efficiently. 

My thoughts thus far:

1. I could give CEP a try. But in that CEP I would need to write something like match all B events in any case. And match A also but only if there is no anti A => doesn`t that produce a lot of state? And are all B events considered in the breadth first rule match approach, I. E. Tons of unnecessary comparisons against A? Any pseudo code on how I could do this with CEP? 

2. If I key data by partition and all other attributes except for the retract boolean so that A and anti A always fall into the same keyed stream but no other event in that stream, I probably get much better comparison capabilities. But how much overhead do I produce with it? Will Flink reshuffle the data even if the first key stays the same? And can I backpartiton to my "global" per partition order? Note that some events have the exact event time timestamp but I still want to have them in their original order later on. 

3. Could I work with session windows somehow? Putting A and Anti A in the same session and in window emit I would just not collect the A event if there is an Anti A? Would it be more or less overhead compared to CEP?

4. Do you have any other idea on how to approach this? Sadly, I have no way to manipulate the input stream, so that part of the pipeline is fixed.

Best regards
Theo
Reply | Threaded
Open this post in threaded view
|

RE: Filter events based on future events

Theo Diefenthal
In reply to this post by Theo Diefenthal

Hi Fabian,

 

Thank’s for sharing your thought’s. I’ll give it a try.

 

Best regards

Theo

 

From: Fabian Hueske <[hidden email]>
Sent: Mittwoch, 11. September 2019 09:55
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: Filter events based on future events

 

Hi Theo,

 

I would implement this with a KeyedProcessFunction.

These are the important points to consider:

 

1) partition the output of the Kafka source by Kafka partition (or the attribute that determines the partition). This will ensure that the data stay in order (per partition).

2) The KeyedProcessFunction needs state to buffer the data of one minute. It depends on the amount of data that you expect to buffer which state is the most efficient. If you expect that one minute can be easily hold in memory, I'd use a FS state backend which keeps all state on the JVM heap. You could use a ValueState with an appropriate data structure (Queue, PrioQueue, ...). The data structure would be held as regular Java object on the heap and hence provide efficient access. If you expect the one minute to be too much data to be held in memory, you need to go for the RocksDB state backend. Since this causes de/serialization with every read and write access, it's more difficult to identify an efficient state primitive / access pattern. I won't go into the details here, assuming that the buffered data fits into memory and you can go for the FS state backend. If that's not the case, let me know and I can share some tips on the RocksDB state backend approach. The KeyedProcessFunction would add records to the buffer state when processElement() is called and emit all buffered records that have a timestamp of less than the timestamp of the currently added record - 1 minute.

 

Note, since the timestamps are monotonically increasing, we do not need watermarks and event-time but can rely on the timestamps of the records. Hence, the application won't block if one partition stalls providing the same benefits that per-key watermarks would offer (if they were supported by Flink).

 

Best, Fabian

 

Am Di., 10. Sept. 2019 um 23:06 Uhr schrieb [hidden email] <[hidden email]>:

Hi there, 

 

I have the following use case:

 

I get transaction logs from multiple servers. Each server puts its logs into its own Kafka partition so that within each partition the elements are monothonically ordered by time. 

 

Within the stream of transactions, we have some special events. Let's call them A. (roughly 1-10% in distribution have this type). 

 

An A event can have an Anti-A event later on in time. That is an event which has all the same attributes (like username, faculty,..) but differs in one boolean attribute indicating that it is an anti event. Kind of a retraction. 

 

Now I want to emit almost all events downstream (including neither A nor Anti-A, let's call them simpy B), preserving the monothonical order of events. There is just one special case in which I want to filter out an element: If the stream has an A event followed by an Anti-A event within one minute time, only the Anti-A event shall go downstream, not A itself. But if there is no Anti-A event, A shall be emitted and shall still be within timestamp order of events. 

 

I'm wrangling my head around it a lot and don't come up with a proper (performant) solution. It seems to be obvious that in the end, I need to buffer all records over 1 minute so that order can be preserved. But I have no idea how to implement this in Flink efficiently. 

 

My thoughts thus far:

 

1. I could give CEP a try. But in that CEP I would need to write something like match all B events in any case. And match A also but only if there is no anti A => doesn`t that produce a lot of state? And are all B events considered in the breadth first rule match approach, I. E. Tons of unnecessary comparisons against A? Any pseudo code on how I could do this with CEP? 

 

2. If I key data by partition and all other attributes except for the retract boolean so that A and anti A always fall into the same keyed stream but no other event in that stream, I probably get much better comparison capabilities. But how much overhead do I produce with it? Will Flink reshuffle the data even if the first key stays the same? And can I backpartiton to my "global" per partition order? Note that some events have the exact event time timestamp but I still want to have them in their original order later on. 

 

3. Could I work with session windows somehow? Putting A and Anti A in the same session and in window emit I would just not collect the A event if there is an Anti A? Would it be more or less overhead compared to CEP?

 

4. Do you have any other idea on how to approach this? Sadly, I have no way to manipulate the input stream, so that part of the pipeline is fixed.

 

Best regards

Theo