Stateful streaming question

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Stateful streaming question

Flavio Pompermaier
Hi to all,
we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.
At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?
Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:
  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?
Thanks in advance for the support,
Flavio

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Kostas Kloudas
Hi Flavio,

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.
In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:


Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 
to look at the may be useful is the queryable state:


This is still an experimental feature, but let us know your opinion if you use it.

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can
query at will.

Thanks,
Kostas


On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.
At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?
Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:
  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?
Thanks in advance for the support,
Flavio


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Flavio Pompermaier
Hi Kostas,
thanks for your quick response. 
I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 
However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

And another question...I've tried to draft such a processand basically I have the following code:

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)
        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

          private transient ValueState<MyGroupedObj> state;

          @Override
          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {
            MyGroupedObj current = state.value();
            if (current == null) {
              current = new MyGroupedObj();
            }
            ....
           current.addTuple(t);
            ... 
            state.update(current);
            out.collect(current);
          }
          
          @Override
          public void open(Configuration config) {
            ValueStateDescriptor<MyGroupedObj> descriptor =
                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));
              state = getRuntimeContext().getState(descriptor);
          }
        });
    groupedObj.print();

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?


On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.
In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:


Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 
to look at the may be useful is the queryable state:


This is still an experimental feature, but let us know your opinion if you use it.

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can
query at will.

Thanks,
Kostas


On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.
At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?
Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:
  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?
Thanks in advance for the support,
Flavio





--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" value="+3904611823908" target="_blank">+(39) 0461 1823908
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Jain, Ankit

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:&#43;39%200461%20182%203908" target="_blank">+(39) 0461 1823908

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Fabian Hueske-2
Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank">+(39) 0461 1823908


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Kostas Kloudas
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank" class="">+(39) 0461 1823908



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Flavio Pompermaier
Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank">+(39) 0461 1823908




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Aljoscha Krettek
Hi,

Trying to revive this somewhat older thread: have you made any progress? I think going with a ProcessFunction that keeps all your state internally and periodically outputs to, say, Elasticsearch using a sink seems like the way to go? You can do the periodic emission using timers in the ProcessFunction. 

In your use case, does the data you would store in the Flink managed state have links between data of different keys? This sounds like it could be a problem when it comes to consistency when outputting to an external system.

Best,
Aljoscha
On 17. May 2017, at 14:12, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank" class="">+(39) 0461 1823908





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Flavio Pompermaier
Hi Aljoscha,
we're still investigating possible solutions here. Yes, as you correctly said there are links between data of different keys so we can only proceed with the next job only once we are sure at 100% that all input data has been consumed and no other data will be read until this last jobs ends.
There should be some sort of synchronization between these 2 jobs...is that possible right now in Flink?

Thanks a lot for the support,
Flavio

On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Trying to revive this somewhat older thread: have you made any progress? I think going with a ProcessFunction that keeps all your state internally and periodically outputs to, say, Elasticsearch using a sink seems like the way to go? You can do the periodic emission using timers in the ProcessFunction. 

In your use case, does the data you would store in the Flink managed state have links between data of different keys? This sounds like it could be a problem when it comes to consistency when outputting to an external system.

Best,
Aljoscha

On 17. May 2017, at 14:12, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank">+(39) 0461 1823908






Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Aljoscha Krettek
Hi,

I’m afraid not. You would have to wait for one job to finish before starting the next one.

Best,
Aljoscha
On 15. Jun 2017, at 20:11, Flavio Pompermaier <[hidden email]> wrote:

Hi Aljoscha,
we're still investigating possible solutions here. Yes, as you correctly said there are links between data of different keys so we can only proceed with the next job only once we are sure at 100% that all input data has been consumed and no other data will be read until this last jobs ends.
There should be some sort of synchronization between these 2 jobs...is that possible right now in Flink?

Thanks a lot for the support,
Flavio

On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Trying to revive this somewhat older thread: have you made any progress? I think going with a ProcessFunction that keeps all your state internally and periodically outputs to, say, Elasticsearch using a sink seems like the way to go? You can do the periodic emission using timers in the ProcessFunction. 

In your use case, does the data you would store in the Flink managed state have links between data of different keys? This sounds like it could be a problem when it comes to consistency when outputting to an external system.

Best,
Aljoscha

On 17. May 2017, at 14:12, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank" class="">+(39) 0461 1823908







Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Flavio Pompermaier
Ok thanks for the clarification. Do you think it could be possible (sooner or later) to have in Flink some sort of synchronization between jobs (as in this case where the input datastream should be "paused" until the second job finishes)? I know I coould use something like Oozie or Falcon to orchestrate jobs but I'd prefer to avoid to add them to our architecture..

Best,
Flavio

On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I’m afraid not. You would have to wait for one job to finish before starting the next one.

Best,
Aljoscha
On 15. Jun 2017, at 20:11, Flavio Pompermaier <[hidden email]> wrote:

Hi Aljoscha,
we're still investigating possible solutions here. Yes, as you correctly said there are links between data of different keys so we can only proceed with the next job only once we are sure at 100% that all input data has been consumed and no other data will be read until this last jobs ends.
There should be some sort of synchronization between these 2 jobs...is that possible right now in Flink?

Thanks a lot for the support,
Flavio

On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Trying to revive this somewhat older thread: have you made any progress? I think going with a ProcessFunction that keeps all your state internally and periodically outputs to, say, Elasticsearch using a sink seems like the way to go? You can do the periodic emission using timers in the ProcessFunction. 

In your use case, does the data you would store in the Flink managed state have links between data of different keys? This sounds like it could be a problem when it comes to consistency when outputting to an external system.

Best,
Aljoscha

On 17. May 2017, at 14:12, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank">+(39) 0461 1823908









Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stateful streaming question

Aljoscha Krettek
I think it might be possible to do but I’m not aware of anyone working on that and I haven’t seen anyone on the mailing lists express interest in working on that.

On 16. Jun 2017, at 11:31, Flavio Pompermaier <[hidden email]> wrote:

Ok thanks for the clarification. Do you think it could be possible (sooner or later) to have in Flink some sort of synchronization between jobs (as in this case where the input datastream should be "paused" until the second job finishes)? I know I coould use something like Oozie or Falcon to orchestrate jobs but I'd prefer to avoid to add them to our architecture..

Best,
Flavio

On Fri, Jun 16, 2017 at 11:23 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I’m afraid not. You would have to wait for one job to finish before starting the next one.

Best,
Aljoscha
On 15. Jun 2017, at 20:11, Flavio Pompermaier <[hidden email]> wrote:

Hi Aljoscha,
we're still investigating possible solutions here. Yes, as you correctly said there are links between data of different keys so we can only proceed with the next job only once we are sure at 100% that all input data has been consumed and no other data will be read until this last jobs ends.
There should be some sort of synchronization between these 2 jobs...is that possible right now in Flink?

Thanks a lot for the support,
Flavio

On Thu, Jun 15, 2017 at 12:16 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

Trying to revive this somewhat older thread: have you made any progress? I think going with a ProcessFunction that keeps all your state internally and periodically outputs to, say, Elasticsearch using a sink seems like the way to go? You can do the periodic emission using timers in the ProcessFunction. 

In your use case, does the data you would store in the Flink managed state have links between data of different keys? This sounds like it could be a problem when it comes to consistency when outputting to an external system.

Best,
Aljoscha

On 17. May 2017, at 14:12, Flavio Pompermaier <[hidden email]> wrote:

Hi to all,
there are a lot of useful discussion points :)

I'll try to answer to everybody.

@Ankit: 
  • right now we're using Parquet on HDFS to store thrift objects. Those objects are essentially structured like
    • key
    • alternative_key
    • list of tuples (representing the state of my Object)
    • This model could be potentially modeled as a Monoid and it's very well suited for a stateful streaming computation where updates to a single key state are not as expansive as a call to any db to get the current list of tuples and update back that list with for an update (IMHO). Maybe here I'm overestimating Flink streaming capabilities...
  • serialization should be ok using thrift, but Flink advice to use tuples to have better performance so just after reading the data from disk (as a ThriftObject) we convert them to its equivalent representation as Tuple3<String, String, List<Tuple4>> version
  • Since I currently use Flink to ingest data that (in the end) means adding tuples to my objects, it would be perfect to have an "online" state of the grouped tuples in order to:
    • add/remove tuples to my object very quickly
    • from time to time, scan the whole online data (or a part of it) and "translate" it into one ore more JSON indices (and put them into Elasticsearch)
@Fabian:
You're right that batch processes are bot very well suited to work with services that can fail...if in a map function the remote call fails all the batch job fails...this should be less problematic with streaming because there's checkpointing and with async IO  is should be the possibile to add some retry/backoff policies in order to not overload remote services like db or solr/es indices (maybe it's not already there but it should be possible to add). Am I wrong?

@Kostas:

From what I understood Queryable state is usefult for gets...what if I need to scan the entire db? For us it could be better do periodically dump the state to RocksDb or HDFS but, as I already said, I'm not sure if it is safe to start a batch job that reads the dumped data while, in the meantime, a possible update of this dump could happen...is there any potential problem to data consistency (indeed tuples within grouped objects have references to other objects keys)?

Best,
Flavio

On Wed, May 17, 2017 at 10:18 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own “precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

On May 17, 2017, at 9:35 AM, Fabian Hueske <[hidden email]> wrote:

Hi Ankit, just a brief comment on the batch job is easier than streaming job argument. I'm not sure about that.
I can see that just the batch job might seem easier to implement, but this is only one part of the whole story. The operational side of using batch is more complex IMO.
You need a tool to ingest your stream, you need storage for the ingested data, you need a periodic scheduler to kick of your batch job, and you need to take care of failures if something goes wrong.
The streaming case, this is not needed or the framework does it for you.

Just my 2 cents, Fabian

2017-05-16 20:58 GMT+02:00 Jain, Ankit <[hidden email]>:

Hi Flavio,

While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts-

 

1)       Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store.

IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one.

 

2)       If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization.

 

3)       Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using.

 

 

Thanks

Ankit

 

From: Flavio Pompermaier <[hidden email]>
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Stateful streaming question

 

Hi Kostas,

thanks for your quick response. 

I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. 

However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)?

 

For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data?

 

And another question...I've tried to draft such a processand basically I have the following code:

 

DataStream<MyGroupedObj> groupedObj = tuples.keyBy(0)

        .flatMap(new RichFlatMapFunction<Tuple4, MyGroupedObj>() {

 

          private transient ValueState<MyGroupedObj> state;

 

          @Override

          public void flatMap(Tuple4 t, Collector<MyGroupedObj> out) throws Exception {

            MyGroupedObj current = state.value();

            if (current == null) {

              current = new MyGroupedObj();

            }

            ....

           current.addTuple(t);

            ... 

            state.update(current);

            out.collect(current);

          }

          

          @Override

          public void open(Configuration config) {

            ValueStateDescriptor<MyGroupedObj> descriptor =

                      new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class));

              state = getRuntimeContext().getState(descriptor);

          }

        });

    groupedObj.print();

 

but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible?

 

 

On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Flavio,

 

From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data.

In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature:

 

 

Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer 

to look at the may be useful is the queryable state:

 

 

This is still an experimental feature, but let us know your opinion if you use it.

 

Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can

query at will.

 

Thanks,

Kostas

 

 

On May 16, 2017, at 4:38 PM, Flavio Pompermaier <[hidden email]> wrote:

 

Hi to all,

we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline.

At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist.

 

This is very expansive to do with batch computation while is should pretty straightforward to do with streaming (from what I understood): I just need to use ListState. Right?

Then, let's say I need to scan all the data of the stateful computation (key and values), in order to do some other computation, I'd like to know:

  • how to do that? I.e. create a DataSet/DataSource<Key,Value> from the stateful data in the stream
  • is there any problem to access the stateful data without stopping incoming data (and thus possible updates to the states)?

Thanks in advance for the support,

Flavio

 

 



 

--

Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. <a href="tel:+39%200461%20182%203908" target="_blank" class="">+(39) 0461 1823908










Loading...