Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

F.Amara
Hi all,

I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3.

I have a Flink Highly Available cluster that reads data from a Kafka producer and processes them within the cluster. I randomly kill a Task Manager to introduce failure. Restart strategy is configured and the cluster does restart processing after a slight delay which is expected.
But when I check the output after the final processing is done, I see duplicates (when sending 4200 events with a 40ms delay between them observed 56 duplicates). As mentioned in [1] I have configured ExternalizedCheckpoints but still do observe duplicates.

Even when I tested (cancelled job and restarted) using manual savepoints 2 or 3 duplicates appeared!

Can someone explain how I can use the savepoint created through ExternalizedCheckpoints to make the application start processing exactly from where it left? I need the application to automatically read the savepoint details and recover from that point onwards rather than doing it manually.
Or else is the usual Savepoints capable of serving the same functionality automatically?

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html 

Thanks,
Amara
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Tzu-Li (Gordon) Tai
Hi Amara,

Could you elaborate a bit more detail about your job? How are you producing the 4200 events into Kafka? Is that a separate process than the consuming job?
Do note that sending data to a Kafka topic is currently only at-least-once delivery, so if you’re sending the data to the Kafka topic as part of the failing job, it’ll be likely that there will be duplicates in the topic.

Also, how are you verifying the exactly-once? As I explained, Kafka producing has only at-least-once delivery guarantees, so checking for topic-to-topic duplicates is currently not a valid way to verify this. To properly verify this, I would suggest having a stateful operator that counts the number of events it has processed, and register that count as a Link managed state. That count should always be 4200 upon cancelling and restarting the job from savepoints.

How you’re creating the savepoint, and whether or not its externalized, is irrelevant here; the exactly-once state guarantees should still hold.

Cheers,
Gordon

On 24 May 2017 at 11:05:49 PM, F.Amara ([hidden email]) wrote:

Hi all,

I'm working with Flink 1.2.0, Kafka 0.10.0.1 and Hadoop 2.7.3.

I have a Flink Highly Available cluster that reads data from a Kafka
producer and processes them within the cluster. I randomly kill a Task
Manager to introduce failure. Restart strategy is configured and the cluster
does restart processing after a slight delay which is expected.
But when I check the output after the final processing is done, I see
duplicates (when sending 4200 events with a 40ms delay between them observed
56 duplicates). As mentioned in [1] I have configured
ExternalizedCheckpoints but still do observe duplicates.

Even when I tested (cancelled job and restarted) using manual savepoints 2
or 3 duplicates appeared!

Can someone explain how I can use the savepoint created through
ExternalizedCheckpoints to make the application start processing exactly
from where it left? I need the application to automatically read the
savepoint details and recover from that point onwards rather than doing it
manually.
Or else is the usual Savepoints capable of serving the same functionality
automatically?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html

Thanks,
Amara



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

F.Amara
Hi Gordan,

Thanks alot for the reply.
The events are produced using a KafkaProducer, submitted to a topic and thereby consumed by the Flink application using a FlinkKafkaConsumer. I verified that during a failure recovery scenario(of the Flink application) the KafkaProducer was not interrupted, resulting in not sending duplicated values from the data source. I observed the output from the FlinkKafkaConsumer and noticed duplicates starting from that point onwards. Is the FlinkKafkaConsumer capable of intoducing duplicates?

How can I implement exactly-once processing for my application? Could you please guide me on what I might have missed?

Thanks,
Amara
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

rmetzger0
Hi Amara,
how are you validating if you have duplicates in your output or not?

If you are just writing the output to another Kafka topic or print it to standard out, you'll see duplicates even if exactly once works.
Flink does not provide exactly once delivery. Flink has exactly once semantics for registered state.

This means you need to cooperate with the system to achieve exactly once. For example for files, you need to remove invalid data from previous failed checkpoints. Our bucketing sink is doing that.


On Tue, May 30, 2017 at 9:01 AM, F.Amara <[hidden email]> wrote:
Hi Gordan,

Thanks alot for the reply.
The events are produced using a KafkaProducer, submitted to a topic and
thereby consumed by the Flink application using a FlinkKafkaConsumer. I
verified that during a failure recovery scenario(of the Flink application)
the KafkaProducer was not interrupted, resulting in not sending duplicated
values from the data source. I observed the output from the
FlinkKafkaConsumer and noticed duplicates starting from that point onwards.
Is the FlinkKafkaConsumer capable of intoducing duplicates?

How can I implement exactly-once processing for my application? Could you
please guide me on what I might have missed?

Thanks,
Amara




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplicated-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-cluster-tp13301p13379.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

F.Amara
Hi Robert,

Thanks a lot for the reply.

To further explain how I verify the presence of duplicates, I write the output stream received at the FlinkKafkaConsumer (after being sent from the KafkaProducer) to a csv file.
Then the content of the file is scanned to see whether we received the exact amount of events sent from the KafkaProducer and then look for values that have appeared more than once indicating duplicates.
In my case the total number of events received is always higher than what we sent.  

The following diagram explains the procedure.

|----------------------------------|       |-------------------|        |---------------------------------|
|      KafkaProducer         |-------->|      Kafka     |------>|  FlinkKafkaConsumer  |
|(A separate Java process|          |                       |      |   (Starting point of        |
|  which generates data     |          |                       |      |   Flink application)        |
|  and writes to Kafka)       |           |                       |      |                                          |
|----------------------------------|           |-------------------|      |------------------------------------|


Thanks,
Amara
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

F.Amara
Hi Robert,

I have few more questions to clarify.

1) Why do you say printing the values to the standard out would display duplicates even if exactly once works? What is the reason for this? Could you brief me on this?

2) I observed duplicates (by writing to a file) starting from the FlinkKafkaConsumer onwards. Why does this component introduce duplicates? Is it because Kafka guarantees only At-least once delivery at the moment?

Thanks,
Amara
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Duplicated data when using Externalized Checkpoints in a Flink Highly Available cluster

Nico Kruber
Hi Amara,
please refer to [1] for some details about our checkpointing mechanism, in
short, for your situation:

* checkpoints are made at certain checkpoint barriers,
* in between those barriers, processing continues and so do outputs
* in case of a failure the state at the latest checkpoint is restored
* then the processing re-starts from there and you will see the same outputs
again

You seem to not deliver to Kafka but only consume from it and write to a csv
file. If this operation was transactional, you would commit at each checkpoint
barrier only and never see the "duplicate", i.e. uncommitted events.

Regards,
Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/
stream_checkpointing.html

On Monday, 5 June 2017 08:55:05 CEST F.Amara wrote:

> Hi Robert,
>
> I have few more questions to clarify.
>
> 1) Why do you say printing the values to the standard out would display
> duplicates even if exactly once works? What is the reason for this? Could
> you brief me on this?
>
> 2) I observed duplicates (by writing to a file) starting from the
> FlinkKafkaConsumer onwards. Why does this component introduce duplicates? Is
> it because Kafka guarantees only At-least once delivery at the moment?
>
> Thanks,
> Amara
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Duplica
> ted-data-when-using-Externalized-Checkpoints-in-a-Flink-Highly-Available-clu
> ster-tp13301p13483.html Sent from the Apache Flink User Mailing List
> archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment
Loading...