Flink restoring a job from a checkpoint

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink restoring a job from a checkpoint

Vishwas Siravara
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Yun Tang
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Vishwas Siravara
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Congxian Qiu
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Yun Tang
Hi Vishwas

This because Flink's checkpoint mechanism could offer you more ability. You could resume from offset within specific checkpoint instead of last committed offset not to mention you could benefit from restoring from last timer state, operator state and keyed state.

Best
Yun Tang



From: Congxian Qiu <[hidden email]>
Sent: Wednesday, October 9, 2019 10:06:12 AM
To: Vishwas Siravara <[hidden email]>
Cc: Yun Tang <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Vishwas Siravara
In reply to this post by Congxian Qiu
Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

AW: Flink restoring a job from a checkpoint

Theo Diefenthal
In reply to this post by Vishwas Siravara
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Congxian Qiu
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Yun Tang
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Flavio Pompermaier
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <[hidden email]> ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Yun Tang
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected with messages together, if the problematic message would cause your job to fail. You cannot complete any checkpoint after that problematic message processed. In other words, you could always resume your job from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier <[hidden email]>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang <[hidden email]>
Cc: Congxian Qiu <[hidden email]>; [hidden email] <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <[hidden email]> ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Flavio Pompermaier
If I understood correctly you're saying that in this case I'd need to reprocess all messages from scratch (unless I retain my checkpoints..), right?
Could it be a good strategy to schedule savepoints periodically to avoid such situations? Is there any smarter solution to this?

On Fri, Oct 11, 2019 at 4:45 AM Yun Tang <[hidden email]> wrote:
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected with messages together, if the problematic message would cause your job to fail. You cannot complete any checkpoint after that problematic message processed. In other words, you could always resume your job from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier <[hidden email]>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang <[hidden email]>
Cc: Congxian Qiu <[hidden email]>; [hidden email] <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <[hidden email]> ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 

Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Congxian Qiu
I don't think schedule savepoint periodically is better than periodic checkpoint(which flink have out of box).

1. Savepoint and checkpoint have the same code path exception savepoint will do a full snapshot, and checkpoint can do an incremental snapshot. If the checkpoint can not be done, then the savepoint can not be done also.

2. Checkpoint is periodic already in Flink

3. Savepoint is always full snapshot -- which means maybe slow, checkpoint can be incremental, and incremental checkpoint is much faster than savepoint.

Best,
Congxian


Flavio Pompermaier <[hidden email]> 于2019年10月11日周五 下午5:24写道:
If I understood correctly you're saying that in this case I'd need to reprocess all messages from scratch (unless I retain my checkpoints..), right?
Could it be a good strategy to schedule savepoints periodically to avoid such situations? Is there any smarter solution to this?

On Fri, Oct 11, 2019 at 4:45 AM Yun Tang <[hidden email]> wrote:
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected with messages together, if the problematic message would cause your job to fail. You cannot complete any checkpoint after that problematic message processed. In other words, you could always resume your job from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier <[hidden email]>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang <[hidden email]>
Cc: Congxian Qiu <[hidden email]>; [hidden email] <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <[hidden email]> ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas 

Reply | Threaded
Open this post in threaded view
|

Re: Flink restoring a job from a checkpoint

Yun Tang
Hi Flavio

If you did not even trigger a savepoint but meet this problem. First of all, please ensure your checkpoint would be retained [1]. Once your job fails due to a problematic message, you need to cancel your job and modify the job to not failover when meeting that problematic message again. Then just submit your job to resume from your last checkpoint [2]. This is the general solution to deal with nu-recoverable problem.



Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Friday, October 11, 2019 19:47
To: Flavio Pompermaier <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
I don't think schedule savepoint periodically is better than periodic checkpoint(which flink have out of box).

1. Savepoint and checkpoint have the same code path exception savepoint will do a full snapshot, and checkpoint can do an incremental snapshot. If the checkpoint can not be done, then the savepoint can not be done also.

2. Checkpoint is periodic already in Flink

3. Savepoint is always full snapshot -- which means maybe slow, checkpoint can be incremental, and incremental checkpoint is much faster than savepoint.

Best,
Congxian


Flavio Pompermaier <[hidden email]> 于2019年10月11日周五 下午5:24写道:
If I understood correctly you're saying that in this case I'd need to reprocess all messages from scratch (unless I retain my checkpoints..), right?
Could it be a good strategy to schedule savepoints periodically to avoid such situations? Is there any smarter solution to this?

On Fri, Oct 11, 2019 at 4:45 AM Yun Tang <[hidden email]> wrote:
Any checkpoint could only completed if your job not failed. Since checkpoint barrier is injected with messages together, if the problematic message would cause your job to fail. You cannot complete any checkpoint after that problematic message processed. In other words, you could always resume your job from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier <[hidden email]>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang <[hidden email]>
Cc: Congxian Qiu <[hidden email]>; [hidden email] <[hidden email]>; user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang <[hidden email]> ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and then you just stop this job and try a new program logical such as print your output instead of writing to previous sink to do some experiments. The new experimental job might commit offset-B to kafka. Once verified, and then you still need to resume from kafka offset-A to ensure all data has been written to target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu <[hidden email]>
Sent: Thursday, October 10, 2019 11:52
To: [hidden email] <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Flink restoring a job from a checkpoint
 
Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  Previously, what I said is from Flink's side, if we do not restore from checkpoint/savepoint, all the TMs will have no state, so the Job starts from scratch.

Best,
Congxian


[hidden email] <[hidden email]> 于2019年10月10日周四 上午1:15写道:
Hi Vishaws, 

With "from scratch", Congxian means that Flink won't load any state automatically and starts as if there was no state. Of course if the kafka consumer group already exists and you have configured Flink to start from group offsets if there is no state yet, it will start from the group offsets. 

I think your approach is totally fine. Ignoring savepoints and don't retaining checkpoints saves overhead and configuration burdens and works nicely as long as you don't have any state in your pipeline. 

You should however be certain that nobody in your team will add something with state later on and forgets to think about the missing state... 

Best regards
Theo




-------- Ursprüngliche Nachricht --------
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my consumer group does not change ? I start from the group offsets : env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
So when I restart the job it should consume from the last committed offset to kafka isn't it ? Let me know what you think . 

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <[hidden email]> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]


Vishwas Siravara <[hidden email]> 于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka source")
I have also enabled and externalized checkpointing to S3 . 
Why is it not recommended to just restart the job once I cancel it, as long as the topology does not change? What is the advantage of explicitly restoring from last checkpoint by passing the -s option to the flink command line if it does the same thing? For instance if s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ is my last successful checkpoint, what is the difference between 1 and 2. 
1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
2. /usr/mware/flink/bin/flink run -s s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
Thanks,
Vishwas 

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <[hidden email]> wrote:
Hi Vishwas

If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need  to enable checkpoint so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could restore from last committed offset if previous checkpoint completed [1][2]. However, this is not really recommended, better to resume from last checkpoint [3]

Best
Yun Tang



From: Vishwas Siravara <[hidden email]>
Sent: Wednesday, October 9, 2019 0:54
To: user <[hidden email]>
Subject: Flink restoring a job from a checkpoint
 
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the last successful checkpoint , since this is what is the last committed offset to kafka ? 

Thanks,
Vishwas