large sliding window perf question

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

large sliding window perf question

Chen Qin
Hi there,

I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 

pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.

what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.

I also tried to limit window time to mins and all issues are gone.

Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)

Thanks,
Chen


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Stefan Richter
Hi,

Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink.

Best,
Stefan

Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:

Hi there,

I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 

pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.

what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.

I also tried to limit window time to mins and all issues are gone.

Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)

Thanks,
Chen



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Carst Tankink

Hi,

 

We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:

 

Technical specs:

-          Flink 1.2.1 on YARN

-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help

 

Pipeline:

-          Read from Kafka, extract ids

-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute

-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.

-          Post-process data, sink.

 

What I observe is:

-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.

-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)

o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.

o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.

 

 

At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.

 

Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)

 

 

Thanks,

Carst

 

From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question

 

Hi,

 

Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink.

 

Best,

Stefan

 

Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:

 

Hi there,

 

I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 

 

pipeline looks simple, 

tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.

 

what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.

 

I also tried to limit window time to mins and all issues are gone.

 

Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)

 

Thanks,

Chen

 

 



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Stefan Richter
Hi,

both issues sound like the known problem with RocksDB merging state. Please take a look here


and here


Best,
Stefan

 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:

Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Aljoscha Krettek
Hi,

I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.

I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.

Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:

Hi,

both issues sound like the known problem with RocksDB merging state. Please take a look here


and here


Best,
Stefan

 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:

Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Carst Tankink

Hi,

 

Thanks Aljoshcha!

To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 

I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).

 

Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.

 

 

Thanks again,

Carst

 

From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question

 

Hi,

 

I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.

 

I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.

 

Best,

Aljoscha

On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:

 

Hi,

 

both issues sound like the known problem with RocksDB merging state. Please take a look here

 

 

and here

 

 

Best,

Stefan

 

 

Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:

 

Hi,

 

We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:

 

Technical specs:

-          Flink 1.2.1 on YARN

-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help

 

Pipeline:

-          Read from Kafka, extract ids

-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute

-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.

-          Post-process data, sink.

 

What I observe is:

-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.

-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)

o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.

o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.

 

 

At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.

 

Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)

 

 

Thanks,

Carst

 

From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question

 

Hi,

 

Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 

 

Best,

Stefan

 

Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:

 

Hi there,

 

I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 

 

pipeline looks simple, 

tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.

 

what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.

 

I also tried to limit window time to mins and all issues are gone.

 

Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)

 

Thanks,

Chen

 

 




 

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Aljoscha Krettek
Hi,

Yes Carst, that’s exactly what happens: 240 get+put calls.

Best,
Aljoscha

On 24. May 2017, at 15:49, Carst Tankink <[hidden email]> wrote:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Stefan Richter
In reply to this post by Carst Tankink
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take a look here because he knows best about the expected scalability of the sliding window implementation.
 
Am 24.05.2017 um 16:49 schrieb Carst Tankink <[hidden email]>:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Chen Qin
Got it! Looks like 30days window and trigger 10seconds is way too many (quarter million every 10 seconds per key, around 150 keys). 

Just to add some background, I tried three ways to implement this large sliding window pipeline, all share same configuration and use rocksdb statebackend remote to s3
  • out of box sliding window 30days 10s trigger
  • processfunction with list state
  • process function with in memory cache, update valuestate during checkpoint, filter & emits list of events periodically. Value state checkpoint as blob seems complete quickly.
First two options see perf issue, third one so far works fine.

Thanks,
Chen

On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <[hidden email]> wrote:
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take a look here because he knows best about the expected scalability of the sliding window implementation.
 
Am 24.05.2017 um 16:49 schrieb Carst Tankink <[hidden email]>:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Aljoscha Krettek
Hi Chen,

How to you update the ValueState during checkpointing. I’m asking because a keyed state should always be scoped to a key and when checkpointing there is no key scope because we are not processing any incoming element and we’re not firing a timer (the two cases where we have a key scope).

Best,
Aljoscha

On 24. May 2017, at 21:05, Chen Qin <[hidden email]> wrote:

Got it! Looks like 30days window and trigger 10seconds is way too many (quarter million every 10 seconds per key, around 150 keys). 

Just to add some background, I tried three ways to implement this large sliding window pipeline, all share same configuration and use rocksdb statebackend remote to s3
  • out of box sliding window 30days 10s trigger
  • processfunction with list state
  • process function with in memory cache, update valuestate during checkpoint, filter & emits list of events periodically. Value state checkpoint as blob seems complete quickly.
First two options see perf issue, third one so far works fine.

Thanks,
Chen

On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <[hidden email]> wrote:
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take a look here because he knows best about the expected scalability of the sliding window implementation.
 
Am 24.05.2017 um 16:49 schrieb Carst Tankink <[hidden email]>:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Chen Qin
I see, not sure this this hack works. It utilize operator state to hold all <key, states> mapping assigned to that operator instance.

If key by can generate determined mapping between upstream events to fixed operator parallelism, then the operator state could hold mapping between keys  and their states, updates only needed when snapshot triggered.(dump cache to operator state) I don’t use timer in this case, but keep a last emit map (keyed by event key) to track when to flush downstream within processFunction.


Thanks,
Chen


On May 29, 2017, at 2:38 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi Chen,

How to you update the ValueState during checkpointing. I’m asking because a keyed state should always be scoped to a key and when checkpointing there is no key scope because we are not processing any incoming element and we’re not firing a timer (the two cases where we have a key scope).

Best,
Aljoscha

On 24. May 2017, at 21:05, Chen Qin <[hidden email]> wrote:

Got it! Looks like 30days window and trigger 10seconds is way too many (quarter million every 10 seconds per key, around 150 keys). 

Just to add some background, I tried three ways to implement this large sliding window pipeline, all share same configuration and use rocksdb statebackend remote to s3
  • out of box sliding window 30days 10s trigger
  • processfunction with list state
  • process function with in memory cache, update valuestate during checkpoint, filter & emits list of events periodically. Value state checkpoint as blob seems complete quickly.
First two options see perf issue, third one so far works fine.

Thanks,
Chen

On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <[hidden email]> wrote:
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take a look here because he knows best about the expected scalability of the sliding window implementation.
 
Am 24.05.2017 um 16:49 schrieb Carst Tankink <[hidden email]>:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: large sliding window perf question

Chen Qin
B.T.W It might be better off to pre aggregation via slidingWindow with controlled bucket size and batch update as well as retention.

Thanks,
Chen

On May 29, 2017, at 3:05 PM, Chen Qin <[hidden email]> wrote:

I see, not sure this this hack works. It utilize operator state to hold all <key, states> mapping assigned to that operator instance.

If key by can generate determined mapping between upstream events to fixed operator parallelism, then the operator state could hold mapping between keys  and their states, updates only needed when snapshot triggered.(dump cache to operator state) I don’t use timer in this case, but keep a last emit map (keyed by event key) to track when to flush downstream within processFunction.


Thanks,
Chen


On May 29, 2017, at 2:38 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi Chen,

How to you update the ValueState during checkpointing. I’m asking because a keyed state should always be scoped to a key and when checkpointing there is no key scope because we are not processing any incoming element and we’re not firing a timer (the two cases where we have a key scope).

Best,
Aljoscha

On 24. May 2017, at 21:05, Chen Qin <[hidden email]> wrote:

Got it! Looks like 30days window and trigger 10seconds is way too many (quarter million every 10 seconds per key, around 150 keys). 

Just to add some background, I tried three ways to implement this large sliding window pipeline, all share same configuration and use rocksdb statebackend remote to s3
  • out of box sliding window 30days 10s trigger
  • processfunction with list state
  • process function with in memory cache, update valuestate during checkpoint, filter & emits list of events periodically. Value state checkpoint as blob seems complete quickly.
First two options see perf issue, third one so far works fine.

Thanks,
Chen

On Wed, May 24, 2017 at 8:24 AM, Stefan Richter <[hidden email]> wrote:
Yes Cast, I noticed your version is already 1.2.1, which is why I contacted Aljoscha to take a look here because he knows best about the expected scalability of the sliding window implementation.
 
Am 24.05.2017 um 16:49 schrieb Carst Tankink <[hidden email]>:

Hi,
 
Thanks Aljoshcha!
To complete my understanding: the problem here is that each element in the sliding window(s) basically triggers 240 get+put calls instead of just 1, right? I can see how that blows up :-) 
I have a good idea on how to proceed next, so I will be trying out writing the custom ProcessFunction next (week).
 
Stefan, in our case we are already on Flink 1.2.1 which should have the patched version of RocksDB, right? Because that patch did solve an issue we had in a different Flink job (a Kafka Source -> HDFS/Bucketing Sink which was stalling quite often under Flink 1.2.0) but did not solve this case, which fits the “way too much RocksDB access” explanation better.
 
 
Thanks again,
Carst
 
From: Aljoscha Krettek <[hidden email]>
Date: Wednesday, May 24, 2017 at 16:13
To: Stefan Richter <[hidden email]>
Cc: Carst Tankink <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi, 
 
I’m afraid you’re running into a general shortcoming of the current sliding windows implementation: every sliding window is treated as its own window that has window contents and trigger state/timers. For example, if you have a sliding window of size 4 hours with 1 minute slide this means each element is in 240 windows and you basically amplify writing to RocksDB by 240. This gets out of hand very quickly with larger differences between window side and slide interval.
 
I’m also afraid there is no solution for this right now so the workaround Chen mentioned is the way to go right now.
 
Best,
Aljoscha
On 24. May 2017, at 14:07, Stefan Richter <[hidden email]> wrote:
 
Hi, 
 
both issues sound like the known problem with RocksDB merging state. Please take a look here
 
 
and here
 
 
Best,
Stefan
 
 
Am 24.05.2017 um 14:33 schrieb Carst Tankink <[hidden email]>:
 
Hi,
 
We are seeing a similar behaviour for large sliding windows. Let me put some details here and see if they match up enough with Chen’s:
 
Technical specs:
-          Flink 1.2.1 on YARN
-          RocksDB backend, on HDFS. I’ve set the backend to PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM since our Hadoop cluster runs on spinning disks but that doesn’t seem to help
 
Pipeline:
-          Read from Kafka, extract ids
-          KeyBy id,  count occurences of each id using a fold. The window size of this operator is 10 minutes with a slide of 1 minute
-          KeyBy id (again),  compute mean, standard deviation using a fold. The window size of this operator is 4 hours with a slide of 1 minute.
-          Post-process data, sink.
 
What I observe is:
-          With a heap-based backend, the job runs really quick  (couple of minutes to process 7 days of Kafka data) but eventually goes OOM with a GC overhead exceeded error.
-          With the RocksDB backend, checkpoints get stuck most of the time, and the “count occurences” step gets a lot of back pressure from the next operator (on the large window)
o    In those cases the checkpoint does succeed, the state for the large window is around 500-700MB, others states are within the KBs.
o    Also in those cases, all time seems to be spent in the ‘alignment’ phase for a single subtask of the count operator, with the other operators aligning within milliseconds. The checkpoint duration itself is no more than 2seconds even for the larger states.
 
 
At this point, I’m a bit at a loss to figure out what’s going on. My best guess is it has to do with the state access to the RocksDBFoldingState, but why this so slow is beyond me.
 
Hope this info helps in figuring out what is going on, and hopefully it is actually related to Chen’s case :)
 
 
Thanks,
Carst
 
From: Stefan Richter <[hidden email]>
Date: Tuesday, May 23, 2017 at 21:35
To: "[hidden email]" <[hidden email]>
Subject: Re: large sliding window perf question
 
Hi,
 
Which state backend and Flink version are you using? There was a problem with large merging states on RocksDB, caused by some inefficiencies in the merge operator of RocksDB. We provide a custom patch for this with all newer versions of Flink. 
 
Best,
Stefan
 
Am 23.05.2017 um 21:24 schrieb Chen Qin <[hidden email]>:
 
Hi there,
 
I have seen some weird perf issue while running event time based job with large sliding window (24 hours offset every 10s) 
 
pipeline looks simple, 
tail kafka topic and assign timestamp and watermark, forward to large sliding window (30days) and fire every 10 seconds and print out.
 
what I have seen first hand was checkpointing stuck, took longer than timeout despite traffic volume is low ~300 TPS. Looking deeper, it seems back pressure kick in and window operator consumes message really slowly and throttle sources.
 
I also tried to limit window time to mins and all issues are gone.
 
Any suggestion on this. My work around is I implemented processFunction and keep big value state, periodically evaluate and emit downstream (emulate what sliding window does)
 
Thanks,
Chen
 
 



 
 





Loading...