Porting batch percentile computation to streaming window

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Porting batch percentile computation to streaming window

William Saar-2
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!
William
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Porting batch percentile computation to streaming window

Gyula Fóra
Hi William,

I think basically the feature you are looking for are side inputs which is not implemented yet but let me try to give a workaround that might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and every by ip metric aggregate could pick up the TDigest for the current window. All this assumes that you attach the window information to the aggregate metrics and the TDigest (you can do this in the window reduce step). 

This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!

William
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Porting batch percentile computation to streaming window

William Saar-2
> This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value stream is not grouped, but I need to distribute the values to compute the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp = values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How do I know when the flat map has seen all values and should emit its result?
percentilesStream = tDigestStream.broadcast().connect(windowMetricsByIp).flatMap

If I attach information about the current window to the metrics events on line 1, can I perhaps use that information to make flatMap on line 2 decide when to emit its T-Digest? The crudest solution is to emit the T-Digest for a window when the first event of the next window arrives (will this cause problems with back-pressure?)
Less crude, maybe I can store watermark information or something on metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold operation in line 1?

Thanks!

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 08:56:28 +0000
Subject:
Re: Porting batch percentile computation to streaming window


Hi William,

I think basically the feature you are looking for are side inputs which is not implemented yet but let me try to give a workaround that might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and every by ip metric aggregate could pick up the TDigest for the current window. All this assumes that you attach the window information to the aggregate metrics and the TDigest (you can do this in the window reduce step). 

This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!

William
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Porting batch percentile computation to streaming window

Gyula Fóra
I think you could actually do a window operation to get the tDigestStream from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)

This way the watermark mechanism should ensure you get all partial results before flushing the global window.

Gyula

William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 30., K, 15:03):
> This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value stream is not grouped, but I need to distribute the values to compute the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp = values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How do I know when the flat map has seen all values and should emit its result?
percentilesStream = tDigestStream.broadcast().connect(windowMetricsByIp).flatMap

If I attach information about the current window to the metrics events on line 1, can I perhaps use that information to make flatMap on line 2 decide when to emit its T-Digest? The crudest solution is to emit the T-Digest for a window when the first event of the next window arrives (will this cause problems with back-pressure?)
Less crude, maybe I can store watermark information or something on metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold operation in line 1?

Thanks!

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 08:56:28 +0000
Subject:
Re: Porting batch percentile computation to streaming window



Hi William,

I think basically the feature you are looking for are side inputs which is not implemented yet but let me try to give a workaround that might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and every by ip metric aggregate could pick up the TDigest for the current window. All this assumes that you attach the window information to the aggregate metrics and the TDigest (you can do this in the window reduce step). 

This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!

William
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Porting batch percentile computation to streaming window

William Saar-2
Nice! The solution is actually starting to look quite clean with this in place.

Finally, does Flink offer functionality to retrieve information about the current window that a rich function is running on? I don't see anything in the RuntimeContext classes about the current window...

As you pointed out earlier, I need to attach a window ID (for instance, the starting timestamp of the window) to each metric and propagate it to the TDigest objects to be able to associate the metrics with the right TDigest in the last stateful CoFlatMapFunction. You mentioned I should compute the window information in the initial fold function that computes the metrics,  and while I can compute a common window-start timestamp from the events in the metrics computation it would seem less ugly and error-prone if I could get information about the current window the fold function is running on from Flink.

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 13:56:08 +0000
Subject:
Re: Porting batch percentile computation to streaming window


I think you could actually do a window operation to get the tDigestStream from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)

This way the watermark mechanism should ensure you get all partial results before flushing the global window.

Gyula

William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 30., K, 15:03):
> This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value stream is not grouped, but I need to distribute the values to compute the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp = values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How do I know when the flat map has seen all values and should emit its result?
percentilesStream = tDigestStream.broadcast().connect(windowMetricsByIp).flatMap

If I attach information about the current window to the metrics events on line 1, can I perhaps use that information to make flatMap on line 2 decide when to emit its T-Digest? The crudest solution is to emit the T-Digest for a window when the first event of the next window arrives (will this cause problems with back-pressure?)
Less crude, maybe I can store watermark information or something on metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold operation in line 1?

Thanks!

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 08:56:28 +0000
Subject:
Re: Porting batch percentile computation to streaming window



Hi William,

I think basically the feature you are looking for are side inputs which is not implemented yet but let me try to give a workaround that might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and every by ip metric aggregate could pick up the TDigest for the current window. All this assumes that you attach the window information to the aggregate metrics and the TDigest (you can do this in the window reduce step). 

This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!

William
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Porting batch percentile computation to streaming window

Gyula Fóra

William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 31., Sze, 1:36):
Nice! The solution is actually starting to look quite clean with this in place.

Finally, does Flink offer functionality to retrieve information about the current window that a rich function is running on? I don't see anything in the RuntimeContext classes about the current window...

As you pointed out earlier, I need to attach a window ID (for instance, the starting timestamp of the window) to each metric and propagate it to the TDigest objects to be able to associate the metrics with the right TDigest in the last stateful CoFlatMapFunction. You mentioned I should compute the window information in the initial fold function that computes the metrics,  and while I can compute a common window-start timestamp from the events in the metrics computation it would seem less ugly and error-prone if I could get information about the current window the fold function is running on from Flink.

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 13:56:08 +0000

Subject:
Re: Porting batch percentile computation to streaming window


I think you could actually do a window operation to get the tDigestStream from windowMetricsByIp:

windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)

This way the watermark mechanism should ensure you get all partial results before flushing the global window.

Gyula

William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 30., K, 15:03):
> This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Unfortunately, I think this assumption is a deal-breaker. The value stream is not grouped, but I need to distribute the values to compute the metrics and I am populating the TDigest with the metrics

Your suggestion gave me some ideas. Assume I have
windowMetricsByIp = values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How do I know when the flat map has seen all values and should emit its result?
percentilesStream = tDigestStream.broadcast().connect(windowMetricsByIp).flatMap

If I attach information about the current window to the metrics events on line 1, can I perhaps use that information to make flatMap on line 2 decide when to emit its T-Digest? The crudest solution is to emit the T-Digest for a window when the first event of the next window arrives (will this cause problems with back-pressure?)
Less crude, maybe I can store watermark information or something on metrics objects in line 1 and emit T digests more often in line 2?

Finally, how do I access the watermark/window information in my fold operation in line 1?

Thanks!

----- Original Message -----
From:
"Gyula Fóra" <[hidden email]>

To:
"William Saar" <[hidden email]>, <[hidden email]>
Cc:

Sent:
Tue, 30 May 2017 08:56:28 +0000
Subject:
Re: Porting batch percentile computation to streaming window



Hi William,

I think basically the feature you are looking for are side inputs which is not implemented yet but let me try to give a workaround that might work. 

If I understand correctly you have two windowed computations:
TDigestStream = allMetrics.windowAll(...).reduce()
windowMetricsByIP = allMetrics.keyBy(ip).reduce()

And now you want to join these two by window to compute the percentiles
Something like:

TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)

In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest> and every by ip metric aggregate could pick up the TDigest for the current window. All this assumes that you attach the window information to the aggregate metrics and the TDigest (you can do this in the window reduce step). 

This logic now assumes that you get the TDigest result before getting any groupBy metric, which will probably not be the case so you could do some custom buffering in state. Depending on the rate of the stream this might or might not be feasible :)

Does this sound reasonable? I hope I have understood the use-case correctly.
Gyula


William Saar <[hidden email]> ezt írta (időpont: 2017. máj. 29., H, 18:34):
I am porting a calculation from Spark batches that uses broadcast variables to compute percentiles from metrics and curious for tips on doing this with Flink streaming.

I have a windowed computation where I am compute metrics for IP-addresses (a windowed stream of metrics objects grouped by IP-addresses). Now I would like to compute percentiles for each IP from the metrics.

My idea is to send all the metrics to a node that computes a global TDigest and then rejoins the computed global TDigest with the IP-grouped metrics stream to compute the percentiles for each IP. Is there a neat way to implement this in Flink?

I am curious about the best way to join a global valuem like our TDigest, with every result of a grouped window stream.  Also how to know when the TDigest is complete and has seen every element in the window (say if I implement it in a stateful flatMap that emits the value after seeing all stream values).

Thanks!

William
Loading...