I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
understand how checkpoints actually work when Window operator is processing
My pipeline has the following flow where each operator's parallelism is 1.
source -> flatmap -> tumbling window -> sink
In this pipeline, I had configured the window to be evaluated every 1 hour
(3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
timeout was set to 1 hour as I wanted the checkpoints to complete.
In my window function, the job makes https call to another service so window
function may take some time to evaluate/process all events.
I see that the time spent for checkpoint was actually just 5 ms & 8 ms
(checkpoint duration sync) for window & sink operators. However, End to End
Duration for checkpoint was 11m 12s for both window & sink operator.
Is this expected behavior? If yes, do you have any suggestion to reduce the
end to end checkpoint duration?
Please let me know if any more information is needed.
Checkpoint duration sync, that’s only the time taken for the “synchronous” part of taking a snapshot of your operator. Your 11m time probably comes from the fact that before this snapshot, checkpoint barrier was stuck somewhere in your pipeline for that amount of time processing some record or bunch of records.
If you write a simple function that only performs `Thread.sleep(new Random().randomInt(3600000))` and nothing else, your checkpoints will be taking random amount of time, since snapshots can not be taken while your function is also executing some code. You can read about some of those concepts in the documentation
Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
Two follow-up questions for now.
" When operator snapshots are taken, there are two parts: the synchronous
and the asynchronous parts. "
I understand that when the operator snapshot is being taken, the processing
of that operator is stopped as taking this snapshot is synchronous part. Is
there any other synchronous part in the snapshot / checkpoint process?
Based on the test I mentioned above, my understanding is that for a window
operator, when all events that belongs to checkpoint N and the checkpoint
barrier N are received by window operator (but pending for window to be
triggered), then checkpoint barrier N will be immediately emitted to the
sink operator (so snapshot can be completed) while the events are still
pending to be evaluated by window operator.
> On 6 Nov 2018, at 18:22, PranjalChauhan <[hidden email]> wrote:
> Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
> next year.
> Two follow-up questions for now.
> " When operator snapshots are taken, there are two parts: the synchronous
> and the asynchronous parts. "
> I understand that when the operator snapshot is being taken, the processing
> of that operator is stopped as taking this snapshot is synchronous part. Is
> there any other synchronous part in the snapshot / checkpoint process?
Not as far as I know.
> Based on the test I mentioned above, my understanding is that for a window
> operator, when all events that belongs to checkpoint N and the checkpoint
> barrier N are received by window operator (but pending for window to be
> triggered), then checkpoint barrier N will be immediately emitted to the
> sink operator (so snapshot can be completed) while the events are still
> pending to be evaluated by window operator.
> Can you please confirm my understanding as I was initially confused by the
> following second statement (emits all pending outgoing records) under
> Barriers section in this doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html > ?
> "When an intermediate operator has received a barrier for snapshot n from
> all of its input streams, it emits itself a barrier for snapshot n into all
> of its outgoing streams."
> " Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself. “
I think you might be mixing two different concepts, watermarks and checkpoint barriers. The documentation that you are quoting describes checkpointing mechanism, checkpoint barriers and records alignment. Checkpoint barrier do not cause any results to be emitted from WindowOperator, this happens when timers are triggered (wall clock timers in case of processing time or watermarks in case of event time).
do you observe such long checkpoint times also without performing
external calls? If not, I guess the communication to the external system
Maybe you have to rethink how you perform such calls in order to make
the pipeline more robust against these latencies. Flink also offers an
async operator  for exactly such cases, this could be worth a look.