Checkpoints very slow with high backpressure

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoints very slow with high backpressure

Yassine MARZOUGUI
Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Fwd: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
---------- Forwarded message ----------
From: "Yassine MARZOUGUI" <[hidden email]>
Date: Apr 23, 2017 20:53
Subject: Checkpoints very slow with high backpressure
To: <[hidden email]>
Cc:

Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
In reply to this post by Yassine MARZOUGUI
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.

On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.



In the job manager logs I keep getting warnings : 

2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Rune Skou Larsen

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone +45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Ufuk Celebi
@Yessine: no, there is no way to disable the back pressure mechanism. Do you have more details about the two last operators? What do you mean with the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not too much data is buffered in the internal buffers (https://issues.apache.org/jira/browse/FLINK-4545). You could try the current master and check whether it improves the checkpointing behaviour under back pressure. Out of curiosity, are you using the async I/O API for the communication with the external REST service (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <[hidden email]> wrote:

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone <a href="tel:+45%2031%2060%2024%2097" value="+4531602497" target="_blank">+45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Yassine MARZOUGUI
Hi Ufuk,

The ProcessFunction receives elements and buffers them into a MapState, and periodically (for example every x seconds) register processing time timers (according to some rules which it gets from a connected rule stream). When a timer fires, I pop next element from state, request an external server, and collect the response.
The requests to the external server should happen periodically and not continuousely, that's why I control them using timers, and buffer elements in the RocksdbState.

2017-04-24 13:48 GMT+02:00 Ufuk Celebi <[hidden email]>:
@Yessine: no, there is no way to disable the back pressure mechanism. Do you have more details about the two last operators? What do you mean with the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not too much data is buffered in the internal buffers (https://issues.apache.org/jira/browse/FLINK-4545). You could try the current master and check whether it improves the checkpointing behaviour under back pressure. Out of curiosity, are you using the async I/O API for the communication with the external REST service (https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <[hidden email]> wrote:

Sorry I cant help you, but we're also experiencing slow checkpointing, when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail -  checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
Im sorry guys if you received multiple instances of this mail, I kept trying to send it yesterday, but looks like the mailing list was stuck and didn't dispatch it until now. Sorry for the disturb.
On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <[hidden email]> wrote:
Hi all,
I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.
I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
Currenlty the source contain just one a file of 1GB, so that's the maximum state that the job might hold. I noticed that the backpressure on the operators #1 and #2 is High, and the split reader has only read 60 Mb out of 1Gb source source file. I suspect this is because the ProcessFunction is slow (on purpose). However looks like this affected the checkpoints which are failing after the timeout (which is set to 2 hours), see attached screenshot.
In the job manager logs I keep getting warnings : 
2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
Is the high backpressure the cause for the checkpoints being too slow? If yes Is there a way to disbale the backpressure mechanism since the records will be buffered in the rocksdb state after all which is backed by the disk?
Thank you.
Best,
Yassine
--

Venlig hilsen/Best regards Rune Skou Larsen

goto Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark Phone <a href="tel:+45%2031%2060%2024%2097" value="+4531602497" target="_blank">+45 3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen



Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
This post was updated on .
So what is the resolution? i have a use case where flink consuming messages from kafka. Flink went down about a day ago, so now flink has to process 24 hour worth of events. But i hit backpressure, as of right now checkpoint are timing out. Is there any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line database.

is there recommended approach to handle backpressure?

Version Flink 1.2.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

SHI Xiaogang
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure. It seems there is no good method to handle back pressure now. 

We work around it by setting a larger number of checkpoint timeout. The default value is 10min. But checkpoints usually take more time to complete when there exists back pressure.  You can set it via `CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi <[hidden email]>:
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line
database.

is there recommended approach to handle backpressure?

Version Flink 1.2.






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

Chen Qin
What is root cause of back pressure? 
The reason why I ask is we investigated and applied metrics to measure time to process event and ends up finding bottle neck at frequent managed state updates. Our approach was keeping mem cache and periodical updates states before checkpointing cycle kick in.

This thread might somehow related.

Chen

On Wed, May 31, 2017 at 7:19 PM, SHI Xiaogang <[hidden email]> wrote:
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure. It seems there is no good method to handle back pressure now. 

We work around it by setting a larger number of checkpoint timeout. The default value is 10min. But checkpoints usually take more time to complete when there exists back pressure.  You can set it via `CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi <[hidden email]>:
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation?

Seems like trigger are also not firing so no update being made to down line
database.

is there recommended approach to handle backpressure?

Version Flink 1.2.






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
I tried to extend timeout to 1 hour but no luck. it is still timing out & no exception in log file So i am guessing something stuck, will dig down further.

Here is configuration detail.
 
Standalone cluster & checkpoint store in S3.

i just have 217680 messages in 24 partitions.

Anyidea?
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
Enable info log. it seems it stuck


==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:18,229 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1496321118221

==> /mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@79e68dd3 for Async calls on Source: Custom Source (2/12)
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@78da1e82 for Async calls on Source: Custom Source (5/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@68bff79e for Async calls on Source: Custom Source (8/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@600bdc29 for Async calls on Source: Custom Source (11/12)
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,854 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,859 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,860 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,862 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,863 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,895 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,896 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,896 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,902 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,905 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,909 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,909 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,910 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,911 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,915 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,916 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:24,923 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability, zookeeper
2017-06-01 12:45:24,924 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.quorum, host1:2181,host2:2181
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.storageDir, s3://somelocation/ha-recovery/
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,925 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:25,187 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (11/12)
2017-06-01 12:45:25,188 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (2/12)
2017-06-01 12:45:25,196 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (5/12)
2017-06-01 12:45:25,197 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:25,203 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (8/12)
2017-06-01 12:45:25,227 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::
2017-06-01 12:45:25,257 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumertest ::
2017-06-01 12:45:25,277 INFO  com.company.deserializer.EventDeserializer                  - ======> KafkaConsumer ::

==> /mnt/ephemeral/logs/flink-flink-client-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:45,350 WARN  org.apache.flink.runtime.client.JobSubmissionClientActor      - Discard message LeaderSessionMessage(null,ConnectionTimeout) because the expected leader session ID 2d2a8eac-b837-4605-93cc-81720247f247 did not equal the received leader session ID null.

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:55:18,229 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 1 expired before completing.
2017-06-01 12:55:18,233 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1496321718230

==> /mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@44074ae6 for Async calls on Source: Custom Source (2/12)
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@463dc5a1 for Async calls on Source: Custom Source (5/12)
2017-06-01 12:55:18,236 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@7871a1bb for Async calls on Source: Custom Source (8/12)
2017-06-01 12:55:18,237 INFO  org.apache.flink.core.fs.FileSystem                           - Created new CloseableRegistry org.apache.flink.core.fs.SafetyNetCloseableRegistry@57df8c1d for Async calls on Source: Custom Source (11/12)

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:58:30,764 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 1 from c601dd04affa7da13a226daa222062e7 of job 303656ace348131ed7a38bb02b4fe374.
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoints very slow with high backpressure

rhashmi
Nvm i found it. Backpressure caused by aws RDS instance of mysql.