Flink - Iteration and Backpressure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Flink - Iteration and Backpressure

MAHESH KUMAR
Hi Team,

I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh 
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink - Iteration and Backpressure

rmetzger0
Hi Mahesh,

why do you need to iterate over the elements?

I wonder if you can't just stream the data from kafka1-kafka4 through your topology?



On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <[hidden email]> wrote:
Hi Team,

I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink - Iteration and Backpressure

MAHESH KUMAR
Hi Robert,

The Message Auditor System must monitor all the 4 kafka queue and gather information about messages that made through all of them or say specifically which queue a particular message did not make it through.
We want the window time to be equivalent to our SLA time so that any message that does not make through all the 4 stages would be deemed as failed(expired). If we make the window time equal to our SLA time then the buffers may become full at a faster pace since only at the end of the window, the messages will be categorized to successful/failed. Having iteration helps us to maintain a smaller window where if a message has passed through all the stages within a very short interval(very small compared to SLA) we can categorize it as successful and continue for the messages that has not made it to the final stage(failed/expired). This is the reason we use Iteration.

We could probably avoid Iteration and create a larger time window with SLA time. The system may/will still face the same issue, the back pressure won't allow new messages to go through and the messages inside the window may expire although they actually have passed through the 4 stages. Is there any recommended way to go about it?

Thanks and Regards,
Mahesh



 

On Thu, Jun 1, 2017 at 9:49 AM, Robert Metzger <[hidden email]> wrote:
Hi Mahesh,

why do you need to iterate over the elements?

I wonder if you can't just stream the data from kafka1-kafka4 through your topology?



On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <[hidden email]> wrote:
Hi Team,

I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh 



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink - Iteration and Backpressure

MAHESH KUMAR
Hi Robert/Team,

Is there any recommended solution or any other insight on how I must be doing it?

Thanks and Regards,
Mahesh

On Thu, Jun 1, 2017 at 10:32 AM, MAHESH KUMAR <[hidden email]> wrote:
Hi Robert,

The Message Auditor System must monitor all the 4 kafka queue and gather information about messages that made through all of them or say specifically which queue a particular message did not make it through.
We want the window time to be equivalent to our SLA time so that any message that does not make through all the 4 stages would be deemed as failed(expired). If we make the window time equal to our SLA time then the buffers may become full at a faster pace since only at the end of the window, the messages will be categorized to successful/failed. Having iteration helps us to maintain a smaller window where if a message has passed through all the stages within a very short interval(very small compared to SLA) we can categorize it as successful and continue for the messages that has not made it to the final stage(failed/expired). This is the reason we use Iteration.

We could probably avoid Iteration and create a larger time window with SLA time. The system may/will still face the same issue, the back pressure won't allow new messages to go through and the messages inside the window may expire although they actually have passed through the 4 stages. Is there any recommended way to go about it?

Thanks and Regards,
Mahesh



 

On Thu, Jun 1, 2017 at 9:49 AM, Robert Metzger <[hidden email]> wrote:
Hi Mahesh,

why do you need to iterate over the elements?

I wonder if you can't just stream the data from kafka1-kafka4 through your topology?



On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <[hidden email]> wrote:
Hi Team,

I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh 





Loading...