Synchronized Kafka sources

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Synchronized Kafka sources

Yunus Olgun
Hi, 

Is it possible to synchronize two kafka sources? So they can consume from different Kafka topics in close enough event times.

My use case is, I have two Kafka topics: A(very large) and B(large). There is a mapping of one to one or zero between A and B. Topology is simply join A and B in a tumbling time window and do aggregations on the joined data. 

In real time, there is not a problem. But when I start the job for last week it becomes very slow. Because, by the time source A consumes 1 minute of data from Kafka, source B consumes 1 hour of data from Kafka. Since watermark progresses with the smallest of the parent operators, source B generates many windows that will stay in the memory to be triggered in the future. That increases state size. Checkpoints gets bigger and bigger and the job becomes slower.

I have tried to put an operator after sources which writes event times to an external source. If a source is far ahead than the other one, it sleeps for a short time then consume a little bit, then check and sleep again if it is necessary. This map operator increased checkpoint times much higher. I guess sleeping at an operator is not a good idea with checkpoint mechanism.

Is there a way to make two or more sources consume in a synchonized way from Kafka using Flink?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Synchronized Kafka sources

魏偉哲
Hi Yunus,

I'm not sure if there is a way to synchronize two Kafka sources in Flink, but I have another opinion on this question.

How about adjust number of shards and parallelism of consumers on A and B?
For example, making A have higher parallelism and B have lower parallelism so that you can make A operator to process faster.

In this way, you might need to know what is the better ratio to balance these two consumers, because rescale operators is manual currently.
However, the automatic rescale will be added in the future and you can use some mechanisms to automatically rescale the operators like you did to make the operator sleep.

Hope this will help you.

Regards,
Tony Wei


2017-08-04 21:23 GMT+08:00 Yunus Olgun <[hidden email]>:
Hi, 

Is it possible to synchronize two kafka sources? So they can consume from different Kafka topics in close enough event times.

My use case is, I have two Kafka topics: A(very large) and B(large). There is a mapping of one to one or zero between A and B. Topology is simply join A and B in a tumbling time window and do aggregations on the joined data. 

In real time, there is not a problem. But when I start the job for last week it becomes very slow. Because, by the time source A consumes 1 minute of data from Kafka, source B consumes 1 hour of data from Kafka. Since watermark progresses with the smallest of the parent operators, source B generates many windows that will stay in the memory to be triggered in the future. That increases state size. Checkpoints gets bigger and bigger and the job becomes slower.

I have tried to put an operator after sources which writes event times to an external source. If a source is far ahead than the other one, it sleeps for a short time then consume a little bit, then check and sleep again if it is necessary. This map operator increased checkpoint times much higher. I guess sleeping at an operator is not a good idea with checkpoint mechanism.

Is there a way to make two or more sources consume in a synchonized way from Kafka using Flink?

Loading...