understadning kafka connector - rebalance

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

understadning kafka connector - rebalance

Avi Levi
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi
Reply | Threaded
Open this post in threaded view
|

Re: understadning kafka connector - rebalance

Taher Koitawala
Hi Avi,
          No, rebalance is not changing the number of kafka partitions. Lets say you have 6 kafka partitions and your flink parallelism is 8, in this case using rebalance will send records to all downstream operators in a round robin fashion. 

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <[hidden email]> wrote:
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi
Reply | Threaded
Open this post in threaded view
|

Re: understadning kafka connector - rebalance

Avi Levi
Ok, thanks for the clarification. but if I use it with keyed state so the partition is by the key. rebalancing will not shuffle this partitioning ? e.g 
.addSource(source)
      .rebalance
      .keyBy(_.id)
      .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <[hidden email]> wrote:
Hi Avi,
          No, rebalance is not changing the number of kafka partitions. Lets say you have 6 kafka partitions and your flink parallelism is 8, in this case using rebalance will send records to all downstream operators in a round robin fashion. 

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <[hidden email]> wrote:
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi
Reply | Threaded
Open this post in threaded view
|

Re: understadning kafka connector - rebalance

Taher Koitawala
You can use rebalance before keyBy because rebalance returns DataStream. The API does not allow rebalance on keyedStreamed which is returned after keyBy so you are safe. 

On Mon 26 Nov, 2018, 2:25 PM Avi Levi <[hidden email] wrote:
Ok, thanks for the clarification. but if I use it with keyed state so the partition is by the key. rebalancing will not shuffle this partitioning ? e.g 
.addSource(source)
      .rebalance
      .keyBy(_.id)
      .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <[hidden email]> wrote:
Hi Avi,
          No, rebalance is not changing the number of kafka partitions. Lets say you have 6 kafka partitions and your flink parallelism is 8, in this case using rebalance will send records to all downstream operators in a round robin fashion. 

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <[hidden email]> wrote:
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi
Reply | Threaded
Open this post in threaded view
|

Re: understadning kafka connector - rebalance

Fabian Hueske-2
Hi,

DataStream x = ...
x.rebalance().keyBy()

is not a good idea.

It will first distribute the records round-robin (over the network) and subsequently partition them by hash.
The first shuffle is unnecessary. It does not have any effect because it is undone by the second partitioning.

Btw. any methods on DataStream do not have any effect on Kafka topcis or partitions.
In the initially quoted example, we assume that the events of the original DataStream are not evenly distributed among the parallel tasks. The rebalance() call generates an even distribution which is especially important if the map() operation is heavy-weight / compute intensive.

Best, Fabian





Am Mo., 26. Nov. 2018 um 10:59 Uhr schrieb Taher Koitawala <[hidden email]>:
You can use rebalance before keyBy because rebalance returns DataStream. The API does not allow rebalance on keyedStreamed which is returned after keyBy so you are safe. 

On Mon 26 Nov, 2018, 2:25 PM Avi Levi <[hidden email] wrote:
Ok, thanks for the clarification. but if I use it with keyed state so the partition is by the key. rebalancing will not shuffle this partitioning ? e.g 
.addSource(source)
      .rebalance
      .keyBy(_.id)
      .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <[hidden email]> wrote:
Hi Avi,
          No, rebalance is not changing the number of kafka partitions. Lets say you have 6 kafka partitions and your flink parallelism is 8, in this case using rebalance will send records to all downstream operators in a round robin fashion. 

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <[hidden email]> wrote:
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi
Reply | Threaded
Open this post in threaded view
|

Re: understadning kafka connector - rebalance

Avi Levi
Thanks, that makes sense !

On Mon, Nov 26, 2018 at 1:06 PM Fabian Hueske <[hidden email]> wrote:
Hi,

DataStream x = ...
x.rebalance().keyBy()

is not a good idea.

It will first distribute the records round-robin (over the network) and subsequently partition them by hash.
The first shuffle is unnecessary. It does not have any effect because it is undone by the second partitioning.

Btw. any methods on DataStream do not have any effect on Kafka topcis or partitions.
In the initially quoted example, we assume that the events of the original DataStream are not evenly distributed among the parallel tasks. The rebalance() call generates an even distribution which is especially important if the map() operation is heavy-weight / compute intensive.

Best, Fabian





Am Mo., 26. Nov. 2018 um 10:59 Uhr schrieb Taher Koitawala <[hidden email]>:
You can use rebalance before keyBy because rebalance returns DataStream. The API does not allow rebalance on keyedStreamed which is returned after keyBy so you are safe. 

On Mon 26 Nov, 2018, 2:25 PM Avi Levi <[hidden email] wrote:
Ok, thanks for the clarification. but if I use it with keyed state so the partition is by the key. rebalancing will not shuffle this partitioning ? e.g 
.addSource(source)
      .rebalance
      .keyBy(_.id)
      .mapWithState(...)


On Mon, Nov 26, 2018 at 8:32 AM Taher Koitawala <[hidden email]> wrote:
Hi Avi,
          No, rebalance is not changing the number of kafka partitions. Lets say you have 6 kafka partitions and your flink parallelism is 8, in this case using rebalance will send records to all downstream operators in a round robin fashion. 

Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 11:33 AM Avi Levi <[hidden email]> wrote:
Hi
Looking at this example, doing the "rebalance" (e.g messageStream.rebalance().map(...) ) operation on heavy load stream wouldn't slow the stream ? is the rebalancing action occurs only when there is a partition change ? 
it says that "the rebelance call is causing a repartitioning of the data so that all machines" is it actually changing the num of partitions of the topic to match the num of flink operators ?

Avi