Flink/Kafka POC performance issue

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink/Kafka POC performance issue

TechnoMage
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael

Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

Niclas Hedhman
Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java
Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

TechnoMage
In reply to this post by TechnoMage
Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java
Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

TechnoMage
Memory use is steady throughout the job, but the CPU utilization drops off a cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do next?

Michael

On Apr 17, 2018, at 7:11 AM, Michael Latta <[hidden email]> wrote:

Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java

Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

Stephan Ewen
A few ideas how to start debugging this:

  - Try deactivating checkpoints. Without that, no work goes into persisting rocksdb data to the checkpoint store.
  - Try to swap RocksDB for the FsStateBackend - that reduces serialization cost for moving data between heap and offheap (rocksdb).
  - Do you have some expensive types (JSON, etc)? Try activating object reuse (which avoids some extra defensive copies)

On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage <[hidden email]> wrote:
Memory use is steady throughout the job, but the CPU utilization drops off a cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do next?

Michael


On Apr 17, 2018, at 7:11 AM, Michael Latta <[hidden email]> wrote:

Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java


Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

TechnoMage
No checkpoints are active.
I will try that back end.
Yes, using JSONObject subclass for most of the intermediate state, with JSON strings in and out of Kafka.  I will look at the config page for how to enable that.

Thank you,
Michael

On Apr 17, 2018, at 12:51 PM, Stephan Ewen <[hidden email]> wrote:

A few ideas how to start debugging this:

  - Try deactivating checkpoints. Without that, no work goes into persisting rocksdb data to the checkpoint store.
  - Try to swap RocksDB for the FsStateBackend - that reduces serialization cost for moving data between heap and offheap (rocksdb).
  - Do you have some expensive types (JSON, etc)? Try activating object reuse (which avoids some extra defensive copies)

On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage <[hidden email]> wrote:
Memory use is steady throughout the job, but the CPU utilization drops off a cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do next?

Michael


On Apr 17, 2018, at 7:11 AM, Michael Latta <[hidden email]> wrote:

Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java



Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

TechnoMage
Also, I note some messages in the log about my java class not being a valid POJO because it is missing accessors for a field.  Would this impact performance significantly?

Michael

On Apr 17, 2018, at 12:54 PM, TechnoMage <[hidden email]> wrote:

No checkpoints are active.
I will try that back end.
Yes, using JSONObject subclass for most of the intermediate state, with JSON strings in and out of Kafka.  I will look at the config page for how to enable that.

Thank you,
Michael

On Apr 17, 2018, at 12:51 PM, Stephan Ewen <[hidden email]> wrote:

A few ideas how to start debugging this:

  - Try deactivating checkpoints. Without that, no work goes into persisting rocksdb data to the checkpoint store.
  - Try to swap RocksDB for the FsStateBackend - that reduces serialization cost for moving data between heap and offheap (rocksdb).
  - Do you have some expensive types (JSON, etc)? Try activating object reuse (which avoids some extra defensive copies)

On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage <[hidden email]> wrote:
Memory use is steady throughout the job, but the CPU utilization drops off a cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do next?

Michael


On Apr 17, 2018, at 7:11 AM, Michael Latta <[hidden email]> wrote:

Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java




Reply | Threaded
Open this post in threaded view
|

Re: Flink/Kafka POC performance issue

TechnoMage
Also, I note that none of the operations show any back pressure issues, and the records out from the kafka connector slow down to a crawl.  Are there any known issues with kafka throughput that could be the issue rather than flink?  I have a java program that monitors the test that reads all the kafka messages in about 34 min while the flink job has yet to read all the kafka messages 1hr40min later.

Michael

On Apr 17, 2018, at 12:58 PM, TechnoMage <[hidden email]> wrote:

Also, I note some messages in the log about my java class not being a valid POJO because it is missing accessors for a field.  Would this impact performance significantly?

Michael

On Apr 17, 2018, at 12:54 PM, TechnoMage <[hidden email]> wrote:

No checkpoints are active.
I will try that back end.
Yes, using JSONObject subclass for most of the intermediate state, with JSON strings in and out of Kafka.  I will look at the config page for how to enable that.

Thank you,
Michael

On Apr 17, 2018, at 12:51 PM, Stephan Ewen <[hidden email]> wrote:

A few ideas how to start debugging this:

  - Try deactivating checkpoints. Without that, no work goes into persisting rocksdb data to the checkpoint store.
  - Try to swap RocksDB for the FsStateBackend - that reduces serialization cost for moving data between heap and offheap (rocksdb).
  - Do you have some expensive types (JSON, etc)? Try activating object reuse (which avoids some extra defensive copies)

On Tue, Apr 17, 2018 at 5:50 PM, TechnoMage <[hidden email]> wrote:
Memory use is steady throughout the job, but the CPU utilization drops off a cliff.  I assume this is because it becomes I/O bound shuffling managed state.

Are there any metrics on managed state that can help in evaluating what to do next?

Michael


On Apr 17, 2018, at 7:11 AM, Michael Latta <[hidden email]> wrote:

Thanks for the suggestion. The task manager is configured for 8GB of heap, and gets to about 8.3 total. Other java processes (job manager and Kafka). Add a few more. I will check it again but the instances have 16GB same as my laptop that completes the test in <90 min. 

Michael

Sent from my iPad

On Apr 16, 2018, at 10:53 PM, Niclas Hedhman <[hidden email]> wrote:


Have you checked memory usage? It could be as simple as either having memory leaks, or aggregating more than you think (sometimes not obvious how much is kept around in memory for longer than one first thinks). If possible, connect FlightRecorder or similar tool and keep an eye on memory. Additionally, I don't have AWS experience to talk of, but IF AWS swaps RAM to disk like regular Linux, then that might be triggered if your JVM heap is bigger than can be handled within the available RAM.

On Tue, Apr 17, 2018 at 9:26 AM, TechnoMage <[hidden email]> wrote:
I am doing a short Proof of Concept for using Flink and Kafka in our product.  On my laptop I can process 10M inputs in about 90 min.  On 2 different EC2 instances (m4.xlarge and m5.xlarge both 4core 16GB ram and ssd storage) I see the process hit a wall around 50min into the test and short of 7M events processed.  This is running zookeeper, kafka broker, flink all on the same server in all cases.  My goal is to measure single node vs. multi-node and test horizontal scalability, but I would like to figure out why hit hits a wall first.  I have the task maanger configured with 6 slots and the job has 5 parallelism.  The laptop has 8 threads, and the EC2 instances have 4 threads. On smaller data sets and in the begining of each test the EC2 instances outpace the laptop.  I will try again with an m5.2xlarge which has 8 threads and 32GB ram to see if that works better for this workload.  Any pointers or ways to get metrics that would help diagnose this would be appreciated.

Michael




--
Niclas Hedhman, Software Developer
http://polygene.apache.org - New Energy for Java