Are timers in ProcessFunction fault tolerant?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Are timers in ProcessFunction fault tolerant?

Moiz Jinia
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Eron Wright
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Tzu-Li (Gordon) Tai
Hi Moiz!

Adding a bit of more detail here:
Yes, the timer will be restored on whatever new instance is responsible for that key.
There is one “gotcha” to look out for, though: the firing time of timers are absolute; what this means is that if the checkpoints timer’s firing processing timestamp is t (which is basically the registering time + configured trigger time), then it will fire also at processing timestamp t on the new instance. Therefore, you should be aware of out-of-sync clocks between the 2 instances.

Another thing to note is that if the job isn’t running at t (when the timer is supposed to fire), then on restore, that timer is fired immediately.

Cheers,
Gordon

On 26 May 2017 at 12:44:00 AM, Eron Wright ([hidden email]) wrote:

Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Moiz Jinia
In reply to this post by Eron Wright
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright <[hidden email]> wrote:
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Moiz Jinia
A follow on question. Since the registered timers are part of the managed key state, do the timers get cancelled when i call state.clear()?

Moiz

On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia <[hidden email]> wrote:
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright <[hidden email]> wrote:
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Kostas Kloudas
Hi Moiz,

state.clear() refers to the state that you have registered in your job, using the getState()
from the runtimeContext.
 
Timers are managed by Flink’s timer service and they are cleaned up by Flink itself when 
the job terminates.

Kostas

On May 26, 2017, at 6:41 AM, Moiz S Jinia <[hidden email]> wrote:

A follow on question. Since the registered timers are part of the managed key state, do the timers get cancelled when i call state.clear()?

Moiz

On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia <[hidden email]> wrote:
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright <[hidden email]> wrote:
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Moiz Jinia
Thanks Kostas. So even though the timer state is managed separately from the key state (from runtimeContext) I can safely assume both the states to be fault tolerant and maintain association with the key of the stream?

On Fri, May 26, 2017 at 1:51 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Moiz,

state.clear() refers to the state that you have registered in your job, using the getState()
from the runtimeContext.
 
Timers are managed by Flink’s timer service and they are cleaned up by Flink itself when 
the job terminates.

Kostas

On May 26, 2017, at 6:41 AM, Moiz S Jinia <[hidden email]> wrote:

A follow on question. Since the registered timers are part of the managed key state, do the timers get cancelled when i call state.clear()?

Moiz

On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia <[hidden email]> wrote:
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright <[hidden email]> wrote:
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Are timers in ProcessFunction fault tolerant?

Kostas Kloudas
Yes, that is correct.

Kostas

On May 26, 2017, at 11:05 AM, Moiz S Jinia <[hidden email]> wrote:

Thanks Kostas. So even though the timer state is managed separately from the key state (from runtimeContext) I can safely assume both the states to be fault tolerant and maintain association with the key of the stream?

On Fri, May 26, 2017 at 1:51 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Moiz,

state.clear() refers to the state that you have registered in your job, using the getState()
from the runtimeContext.
 
Timers are managed by Flink’s timer service and they are cleaned up by Flink itself when 
the job terminates.

Kostas

On May 26, 2017, at 6:41 AM, Moiz S Jinia <[hidden email]> wrote:

A follow on question. Since the registered timers are part of the managed key state, do the timers get cancelled when i call state.clear()?

Moiz

On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia <[hidden email]> wrote:
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright <[hidden email]> wrote:
Yes, registered timers are stored in managed keyed state and should be fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <[hidden email]> wrote:
With a checkpointed RocksDB based state backend, can I expect the registered processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has registered a processing timer with a timestamp thats a day ahead in the future. If this instance is killed, and the key is moved to another instance, will the onTimer trigger correctly on the other machine at the expected time with the same keyed state (for k1)?

Thanks,
Moiz






Loading...