Fink: KafkaProducer Data Loss

classic Classic list List threaded Threaded
32 messages Options
12
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
I tried to build Flink with hadoop cdc 5.8.3 and test it, but it was unable to come up. When I ran the yarn-session script, it exited with something like "Can't get the url for job manager".

I didn't spend much time figuring out what's wrong, and went straight to Flink 1.3.0. I ran several tests, but saw data loss just once. It's very hard to reproduce though, with this version. With 1.2.1, it was pretty easy to reproduce.

Attaching the logs again. Although in this run, I had kafka logs on DEBUG, so the files are pretty big.

jmV3.log tmOneV3.log tmTwoV3.log 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
In reply to this post by Tzu-Li (Gordon) Tai
I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.

Here are the details:

tmOneCloudera583.log

Received session window task:
2017-06-08 15:10:46,131 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from CREATED to DEPLOYING.

Finished checkpoint 2 (Synchronous part)
2017-06-08 15:15:51,982 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint 2.Alignment duration: 0 ms, snapshot duration 215 ms


The task failed before the verification of completed checkpoint could be verified.
i.e, I don't see the log saying "Notification of complete checkpoint for task TriggerWindow" for checkpoint 2.

jmCloudera583.log

Job Manager received acks for checkpoint 2

2017-06-08 15:15:51,898 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 2 from task 3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
2017-06-08 15:15:51,982 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received acknowledge message for checkpoint 2 from task 3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16
.

Job Manager tried to restore from checkpoint 2.

2017-06-08 15:16:02,111 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2017-06-08 15:16:02,111 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Trying to retrieve checkpoint 2.
2017-06-08 15:16:02,122 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring from latest valid checkpoint: Checkpoint 2 @ 149693476
6105 for 3f5aef5e15a23bce627c05c94760fb16.


tmTwocloudera583.log

Task Manager tried to restore the data and was successful.

2017-06-08 15:16:02,258 DEBUG org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Restoring snapshot from state handles: [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556, 13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598, 14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774, 14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950, 14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126, 15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834, 28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010, 29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346, 40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522, 40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564, 41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740, 41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900, 41916]}, data=File State: hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb [41932 bytes]}].

But apparently, the retore state didn't have all the messages the window had received. Because
a few messages were not replayed, and the kafka sink didn't receive all the messages.

Attaching the files here.

jmCloudera583.log tmOneCloudera583.log tmTwoCloudera583.log 

Thanks for your patience guys.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi Ninad,

Thanks for the logs!
Just to let you know, I’ll continue to investigate this early next week.

Cheers,
Gordon

On 8 June 2017 at 7:08:23 PM, ninad ([hidden email]) wrote:

I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.

Here are the details:

*tmOneCloudera583.log*

Received session window task:
*2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task
- TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from
CREATED to DEPLOYING.

Finished checkpoint 2 (Synchronous part)
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask -
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint
2.Alignment duration: 0 ms, snapshot duration 215 ms
*

The task failed before the verification of completed checkpoint could be
verified.
i.e, I don't see the log saying "Notification of complete checkpoint for
task TriggerWindow" for checkpoint 2.

*jmCloudera583.log*

Job Manager received acks for checkpoint 2

*2017-06-08 15:15:51,898 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.

Job Manager tried to restore from checkpoint 2.

*2017-06-08 15:16:02,111 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Found 1 checkpoints in ZooKeeper.
2017-06-08 15:16:02,111 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Trying to retrieve checkpoint 2.
2017-06-08 15:16:02,122 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
from latest valid checkpoint: Checkpoint 2 @ 149693476
6105 for 3f5aef5e15a23bce627c05c94760fb16.*

*tmTwocloudera583.log*

Task Manager tried to restore the data and was successful.

*2017-06-08 15:16:02,258 DEBUG
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring
snapshot from state handles:
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,
13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,
14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,
14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,
14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,
15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,
28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,
29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,
40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,
40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,
41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,
41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,
41916]}, data=File State:
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
[41932 bytes]}].*

But apparently, the retore state didn't have all the messages the window had
received. Because
a few messages were not replayed, and the kafka sink didn't receive all the
messages.

Attaching the files here.

jmCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
tmOneCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
tmTwoCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>

BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,
but don't see that post here. I did receive an email thought. Hope you guys
saw that.

Thanks for your patience guys.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
Thanks Gordon.

On Jun 11, 2017 9:11 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]" <[hidden email]> wrote:
Hi Ninad,

Thanks for the logs!
Just to let you know, I’ll continue to investigate this early next week.

Cheers,
Gordon

On 8 June 2017 at 7:08:23 PM, ninad ([hidden email]) wrote:

I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.

Here are the details:

*tmOneCloudera583.log*

Received session window task:
*2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task
- TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from
CREATED to DEPLOYING.

Finished checkpoint 2 (Synchronous part)
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask -
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint
2.Alignment duration: 0 ms, snapshot duration 215 ms
*

The task failed before the verification of completed checkpoint could be
verified.
i.e, I don't see the log saying "Notification of complete checkpoint for
task TriggerWindow" for checkpoint 2.

*jmCloudera583.log*

Job Manager received acks for checkpoint 2

*2017-06-08 15:15:51,898 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.

Job Manager tried to restore from checkpoint 2.

*2017-06-08 15:16:02,111 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Found 1 checkpoints in ZooKeeper.
2017-06-08 15:16:02,111 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
Trying to retrieve checkpoint 2.
2017-06-08 15:16:02,122 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
from latest valid checkpoint: Checkpoint 2 @ 149693476
6105 for 3f5aef5e15a23bce627c05c94760fb16.*

*tmTwocloudera583.log*

Task Manager tried to restore the data and was successful.

*2017-06-08 15:16:02,258 DEBUG
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring
snapshot from state handles:
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,
13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,
14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,
14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,
14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,
15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,
28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,
29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,
40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,
40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,
41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,
41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,
41916]}, data=File State:
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
[41932 bytes]}].*

But apparently, the retore state didn't have all the messages the window had
received. Because
a few messages were not replayed, and the kafka sink didn't receive all the
messages.

Attaching the files here.

jmCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
tmOneCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
tmTwoCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>

BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,
but don't see that post here. I did receive an email thought. Hope you guys
saw that.

Thanks for your patience guys.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13620.html
To unsubscribe from Fink: KafkaProducer Data Loss, click here.
NAML
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Aljoscha Krettek
Hi Ninad,

I discussed a bit with Gordon and we have some follow-up questions and some theories as to what could be happening.

Regarding your first case (the one where data loss is observed): How do you ensure that you only shut down the brokers once Flink has read all the data that you expect it to read? And, how do you ensure that the offset that Flink checkpoints in step 3) is the offset that corresponds to the end of your test data?

Regarding your second case (the one where data loss is not observed): What is the difference between steps 3) and 5)? I’m asking because in Flink session windows are merged eagerly when events arrived, not when the Trigger fires. Also, what does “firing” in step 3) mean as opposed to “being evaluated” in step 5). If a Trigger fires this should mean that the window is in fact being evaluated.

Best,
Aljoscha

> On 11. Jun 2017, at 16:14, ninad <[hidden email]> wrote:
>
> Thanks Gordon.
>
> On Jun 11, 2017 9:11 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]" <[hidden email]> wrote:
> Hi Ninad,
>
> Thanks for the logs!
> Just to let you know, I’ll continue to investigate this early next week.
>
> Cheers,
> Gordon
>
> On 8 June 2017 at 7:08:23 PM, ninad ([hidden email]) wrote:
>
>> I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.  
>>
>> Here are the details:
>>
>> *tmOneCloudera583.log*
>>
>> Received session window task:
>> *2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task  
>> - TriggerWindow(ProcessingTimeSessionWindows(30000),
>> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
>> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
>> Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from
>> CREATED to DEPLOYING.
>>
>> Finished checkpoint 2 (Synchronous part)  
>> 2017-06-08 15:15:51,982 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask -
>> TriggerWindow(ProcessingTimeSessionWindows(30000),
>> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
>> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
>> Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint
>> 2.Alignment duration: 0 ms, snapshot duration 215 ms
>> *
>>
>> The task failed before the verification of completed checkpoint could be
>> verified.
>> i.e, I don't see the log saying "Notification of complete checkpoint for
>> task TriggerWindow" for checkpoint 2.
>>
>> *jmCloudera583.log*
>>
>> Job Manager received acks for checkpoint 2
>>
>> *2017-06-08 15:15:51,898 DEBUG
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
>> acknowledge message for checkpoint 2 from task
>> 3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
>> 2017-06-08 15:15:51,982 DEBUG
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
>> acknowledge message for checkpoint 2 from task
>> 3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.
>>
>> Job Manager tried to restore from checkpoint 2.
>>
>> *2017-06-08 15:16:02,111 INFO  
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
>> Found 1 checkpoints in ZooKeeper.
>> 2017-06-08 15:16:02,111 INFO  
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -
>> Trying to retrieve checkpoint 2.
>> 2017-06-08 15:16:02,122 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
>> from latest valid checkpoint: Checkpoint 2 @ 149693476
>> 6105 for 3f5aef5e15a23bce627c05c94760fb16.*
>>
>> *tmTwocloudera583.log*
>>
>> Task Manager tried to restore the data and was successful.  
>>
>> *2017-06-08 15:16:02,258 DEBUG
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring
>> snapshot from state handles:
>> [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>> endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,
>> 13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,
>> 14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,
>> 14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,
>> 14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,
>> 15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,
>> 28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,
>> 29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,
>> 40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,
>> 40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,
>> 41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,
>> 41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,
>> 41916]}, data=File State:
>> hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
>> [41932 bytes]}].*
>>
>> But apparently, the retore state didn't have all the messages the window had
>> received. Because
>> a few messages were not replayed, and the kafka sink didn't receive all the
>> messages.
>>
>> Attaching the files here.
>>
>> jmCloudera583.log
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>  
>> tmOneCloudera583.log
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>  
>> tmTwoCloudera583.log
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>  
>>
>> BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,
>> but don't see that post here. I did receive an email thought. Hope you guys
>> saw that.  
>>
>> Thanks for your patience guys.  
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html 
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>
>
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13620.html
> To unsubscribe from Fink: KafkaProducer Data Loss, click here.
> NAML
>
> View this message in context: Re: Fink: KafkaProducer Data Loss
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
Hi Aljoscha,

I gather you guys aren't able to reproduce this.

Here are the answers to your questions:

How do you ensure that you only shut down the brokers once Flink has read all the data that you expect it to read

Ninad: I am able to see the number of messages received on the Flink Job UI.

And, how do you ensure that the offset that Flink checkpoints in step 3) is the offset that corresponds to the end of your test data.

Ninad: I haven't explicitly verified which offsets were checkpointed. When I say that a checkpoint was successful, I am referring to the Flink logs. So, as long as Flink says that my last successful checkpoint was #7. And on recovery, it restores it's state of checkpoint #7.


What is the difference between steps 3) and 5)?

Ninad: I didn't realize that windows are merged eagerly. I have a session window with interval of 30 secs. Once I see from the UI that all the messages have been received, I don't see the following logs for 30 secs.

MergingWindowSet  - Merging...

So that's why I thought that the windows are merged once the window trigger is fired.

Ex:

I verified from the UI that all messages were received.

I then see this checkpoint in the logs:
2017-06-01 20:21:49,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Notification of complete checkpoint for task TriggerWindow(ProcessingTimeSessionWindows
(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)
) -> Sink: sink.http.sep (1/1)


I then see the windows being merged after a few seconds:

2017-06-01 20:22:14,300 DEBUG org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet  - Merging [TimeWindow{start=1496348534287, end=1496348564287}, TimeWindow{start=1496348534300, end=1496348564300}] into TimeWindow{start=1496348534287, end=1496348564300}


So, point 3 is referring to these logs "MergingWindowSet - Merging .."
And point 4 is referring to the data in windows being evaluated.

Hope this helps. Thanks.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
Hey guys, any update on this? If needed I can attach our code.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Piotr Nowojski
Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, that in certain conditions was causing data losses for all of the FlinkKafkaProducers in Flink:


Namely on checkpoint “flush” method was not being called. It should be fixed in Flink 1.3.2 and 1.4 releases.

Piotrek

On Jul 12, 2017, at 7:32 PM, ninad <[hidden email]> wrote:

Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi Ninad & Piotr,

AFAIK, when this issue was reported, Ninad was using 09.
FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the cause here.

@Ninad
Code to reproduce this would definitely be helpful here, thanks. If you prefer to provide that privately, that would also be fine.

Cheers,
Gordon

On 13 July 2017 at 4:13:07 PM, Piotr Nowojski ([hidden email]) wrote:

Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, that in certain conditions was causing data losses for all of the FlinkKafkaProducers in Flink:


Namely on checkpoint “flush” method was not being called. It should be fixed in Flink 1.3.2 and 1.4 releases.

Piotrek

On Jul 12, 2017, at 7:32 PM, ninad <[hidden email]> wrote:

Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Piotr Nowojski
Ops, sorry, I forgot that this issue was relevant to FlinkKafkaProducer010 only.

Piotrek

On Jul 13, 2017, at 9:33 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi Ninad & Piotr,

AFAIK, when this issue was reported, Ninad was using 09.
FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the cause here.

@Ninad
Code to reproduce this would definitely be helpful here, thanks. If you prefer to provide that privately, that would also be fine.

Cheers,
Gordon

On 13 July 2017 at 4:13:07 PM, Piotr Nowojski ([hidden email]) wrote:

Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, that in certain conditions was causing data losses for all of the FlinkKafkaProducers in Flink:


Namely on checkpoint “flush” method was not being called. It should be fixed in Flink 1.3.2 and 1.4 releases.

Piotrek

On Jul 12, 2017, at 7:32 PM, ninad <[hidden email]> wrote:

Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

ninad
This post was updated on .
In reply to this post by Tzu-Li (Gordon) Tai
Hi Gordon, I was able to reproduce the data loss on standalone flink cluster also. I have attached a stripped down version of our code here:

Environment:
Flink standalone 1.3.0
Kafka 0.9

What the code is doing:
-consume messages from kafka topic ('event.filter.topic' property in application.properties)
-group them by key
-analyze the events in a window and filter some messages.
-send remaining messages to kafka topc ('sep.http.topic' property in application.properties)

To build:
./gradlew clean assemble

The jar needs path to 'application.properties' file to run

Important properties in application.properties:
window.session.interval.sec
kafka.brokers
event.filter.topic --> source topic
sep.http.topic --> destination topic

To test:
-Use 'EventGenerator' class to publish messages to source kafka topic
        The data published won't be filtered by the logic. If you publish 10 messages to the source topic,
        those 10 messages should be sent to the destination topic.

-Once we  see that flink has received all the messages, bring down all kafka brokers

-Let Flink jobs fail for 2-3 times.

-Restart kafka brokers.

Note: Data loss isn't observed frequently. 1/4 times or so.

Thanks for all your help.

eventFilter.zip





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Fink: KafkaProducer Data Loss

Tzu-Li (Gordon) Tai
Hi!

Thanks a lot for providing this.
I'll try to find some time this week to look into this using your example code.

Cheers,
Gordon

On 29 July 2017 at 4:46:57 AM, ninad ([hidden email]) wrote:

Hi Gordon, I was able to reproduce the data loss on standalone flink cluster
also. I have stripped down version of our code with here:

Environment:
Flink standalone 1.3.0
Kafka 0.9

*What the code is doing:*
-consume messages from kafka topic ('event.filter.topic' property in
application.properties)
-group them by key
-analyze the events in a window and filter some messages.
-send remaining messages to kafka topc ('sep.http.topic' property in
application.properties)

To build:
./gradlew clean assemble

The jar needs path to 'application.properties' file to run

Important properties in application.properties:
window.session.interval.sec
kafka.brokers
event.filter.topic --> source topic
sep.http.topic --> destination topic

To test:
-Use 'EventGenerator' class to publish messages to source kafka topic
The data published won't be filtered by the logic. If you publish 10
messages to the source topic,
those 10 messages should be sent to the destination topic.

-Once we see that flink has received all the messages, bring down all kafka
brokers

-Let Flink jobs fail for 2-3 times.

-Restart kafka brokers.

Note: Data loss isn't observed frequently. 1/4 times or so.

Thanks for all your help.

eventFilter.zip
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14522/eventFilter.zip>









--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14522.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
12
Loading...