Kafka and Flink integration

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Kafka and Flink integration

nragon
I have to produce custom objects into kafka and read them with flink. Any tuning advices to use kryo? Such as class registration or something like that? Any examples?

Thanks
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Tzu-Li (Gordon) Tai
Hi!

It’s usually always recommended to register your classes with Kryo, to avoid the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really beats specific custom serialization. So, you can also register specific serializers for Kryo to use for the type.
If you need to store these custom objects as managed state for your operators, you can also have your own custom Flink TypeSerializer for that.

Best,
Gordon

On 16 June 2017 at 12:27:06 PM, nragon ([hidden email]) wrote:

I have to produce custom objects into kafka and read them with flink. Any
tuning advices to use kryo? Such as class registration or something like
that? Any examples?

Thanks



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

nragon
My custom object is used across all job, so it'll be part of checkpoints. Can you point me some references with some examples?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

nragon
This post was updated on .
Do I need to use registerTypeWithKryoSerializer() in my execution environment and register() in kryo instance?
My serialization into kafka is done with the following snippet

try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutStream)) {
      Kryo kryo = new Kryo();
      kryo.writeClassAndObject(output, event);
      output.flush();
      return byteArrayOutStream.toByteArray();
    } catch (IOException e) {
      return null;
    }

"event" is my custom object.

then i desirialize it in flink's kafka consumer
try (ByteArrayInputStream byteArrayInStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, bytes.length)) {
      Kryo kryo = new Kryo();
      return kryo.readClassAndObject(input);
    } catch (IOException e) {
      return null;
    }

Thanks
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

nragon
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Nico Kruber
In reply to this post by nragon
No, this is only necessary if you want to register a custom serializer itself
[1]. Also, in case you are wondering about registerKryoType() - this is only
needed as a performance optimisation.

What exactly is your problem? What are you trying to solve?
(I can't read JFR files here, and from what I read at Oracle's site, this
requires a commercial license, too...)


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
custom_serializers.html

On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution
> environment?
> My serialization into kafka is done with the following snippet
>
> try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();
> Output output = new Output(byteArrayOutStream)) {
>       Kryo kryo = new Kryo();
>       kryo.writeClassAndObject(output, event);
>       output.flush();
>       return byteArrayOutStream.toByteArray();
>     } catch (IOException e) {
>       return null;
>     }
>
> "event" is my custom object.
>
> then i desirialize it in flink's kafka consumer
> try (ByteArrayInputStream byteArrayInStream = new
> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,
> bytes.length)) {
>       Kryo kryo = new Kryo();
>       return kryo.readClassAndObject(input);
>     } catch (IOException e) {
>       return null;
>     }
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a
> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User
> Mailing List archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

nragon

I believe there are some performance impact while de/serializing, which is “normal”. What I’m trying to understand is if there are any tips to improve this process. For instance, tuples vs general class types. Do you know if it’s worth it to map a custom object into tuple just for de/serialization process?

According to jfr analysis, kryo methods are hit a lot.

 

 

 

-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: 20 de junho de 2017 16:04
To: [hidden email]
Cc: Nuno Rafael Goncalves <[hidden email]>
Subject: Re: Kafka and Flink integration

 

No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation.

 

What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...)

 

 

Nico

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/

custom_serializers.html

 

On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet

>

> try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();

> Output output = new Output(byteArrayOutStream)) {

>       Kryo kryo = new Kryo();

>       kryo.writeClassAndObject(output, event);

>       output.flush();

>       return byteArrayOutStream.toByteArray();

>     } catch (IOException e) {

>       return null;

>     }

>

> "event" is my custom object.

>

> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>       Kryo kryo = new Kryo();

>       return kryo.readClassAndObject(input);

>     } catch (IOException e) {

>       return null;

>     }

>

> Thanks

>

>

>

> --

> View this message in context:

> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a

> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Nico Kruber
I can only repeat what Gordon wrote on Friday: "It’s usually always
recommended to register your classes with Kryo [using registerKryoType()], to
avoid the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing
really beats specific custom serialization. So, you can also register specific
serializers for Kryo to use for the type."

I also guess, this highly depends on your actual use case and in particular
the class you are trying to de/serialize.
Unfortunately, your image is to small to read, but does your performance
improve when registering the class as a Kryo type?

Also, I don't think, mapping it to a tuple will improve performance since Kryo
would have to do something similar anyway. Instead, you could really have your
own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly
to "Class <-> bytes".

Nico

On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote:

> I believe there are some performance impact while de/serializing, which is
> "normal". What I'm trying to understand is if there are any tips to improve
> this process. For instance, tuples vs general class types. Do you know if
> it's worth it to map a custom object into tuple just for de/serialization
> process?
>
> According to jfr analysis, kryo methods are hit a lot.
>
> [cid:image003.jpg@01D2E9E1.26D2D370]
>
>
>
>
>
>
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: 20 de junho de 2017 16:04
> To: [hidden email]
> Cc: Nuno Rafael Goncalves <[hidden email]>
> Subject: Re: Kafka and Flink integration
>
>
>
> No, this is only necessary if you want to register a custom serializer
> itself [1]. Also, in case you are wondering about registerKryoType() - this
> is only needed as a performance optimisation.
>
>
>
> What exactly is your problem? What are you trying to solve?
>
> (I can't read JFR files here, and from what I read at Oracle's site, this
> requires a commercial license, too...)
>
>
>
>
>
> Nico
>
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
>
> custom_serializers.html
>
> On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:
> > Do I need to use registerTypeWithKryoSerializer() in my execution
> >
> > environment?
> >
> > My serialization into kafka is done with the following snippet
> >
> >
> >
> > try (ByteArrayOutputStream byteArrayOutStream = new
> > ByteArrayOutputStream();
> >
> > Output output = new Output(byteArrayOutStream)) {
> >
> >       Kryo kryo = new Kryo();
> >      
> >       kryo.writeClassAndObject(output, event);
> >      
> >       output.flush();
> >      
> >       return byteArrayOutStream.toByteArray();
> >    
> >     } catch (IOException e) {
> >    
> >       return null;
> >    
> >     }
> >
> > "event" is my custom object.
> >
> >
> >
> > then i desirialize it in flink's kafka consumer
> >
> > try (ByteArrayInputStream byteArrayInStream = new
> >
> > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,
> >
> > bytes.length)) {
> >
> >       Kryo kryo = new Kryo();
> >      
> >       return kryo.readClassAndObject(input);
> >    
> >     } catch (IOException e) {
> >    
> >       return null;
> >    
> >     }
> >
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > View this message in context:
> >
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-> > a
> >
> > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User
> >
> > Mailing List archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

Tzu-Li Tai
In reply to this post by nragon
Hi Nuno,

In general, if it is possible, it is recommended that you map your generic classes to Tuples / POJOs [1].
For Tuples / POJOs, Flink will create specialized serializers for them, whereas for generic classes (i.e. types which cannot be treated as POJOs) Flink simply fallbacks to using Kryo for them.
The actual performance gain may depend a bit on what the original generic class type looked like.

One other thing probably to look at is enabling object reuse for de-/serialization. However, be aware that the user code needs to be aware of this, otherwise it may lead to unexpected errors.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types

On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves ([hidden email]) wrote:

I believe there are some performance impact while de/serializing, which is “normal”. What I’m trying to understand is if there are any tips to improve this process. For instance, tuples vs general class types. Do you know if it’s worth it to map a custom object into tuple just for de/serialization process?

According to jfr analysis, kryo methods are hit a lot.

 

 

 

-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: 20 de junho de 2017 16:04
To: [hidden email]
Cc: Nuno Rafael Goncalves <[hidden email]>
Subject: Re: Kafka and Flink integration

 

No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation.

 

What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...)

 

 

Nico

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/

custom_serializers.html

 

On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet

>

> try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();

> Output output = new Output(byteArrayOutStream)) {

>       Kryo kryo = new Kryo();

>       kryo.writeClassAndObject(output, event);

>       output.flush();

>       return byteArrayOutStream.toByteArray();

>     } catch (IOException e) {

>       return null;

>     }

>

> "event" is my custom object.

>

> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>       Kryo kryo = new Kryo();

>       return kryo.readClassAndObject(input);

>     } catch (IOException e) {

>       return null;

>     }

>

> Thanks

>

>

>

> --

> View this message in context:

> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a

> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.

 

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

nragon
In reply to this post by Nico Kruber
Nico,

I'll try some different approaches and will be back here, hopefully with some results :)
Thanks for this brainstorming :)

-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: 20 de junho de 2017 16:44
To: Nuno Rafael Goncalves <[hidden email]>
Cc: [hidden email]
Subject: Re: Kafka and Flink integration

I can only repeat what Gordon wrote on Friday: "It’s usually always recommended to register your classes with Kryo [using registerKryoType()], to avoid the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really beats specific custom serialization. So, you can also register specific serializers for Kryo to use for the type."

I also guess, this highly depends on your actual use case and in particular the class you are trying to de/serialize.
Unfortunately, your image is to small to read, but does your performance improve when registering the class as a Kryo type?

Also, I don't think, mapping it to a tuple will improve performance since Kryo would have to do something similar anyway. Instead, you could really have your own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly to "Class <-> bytes".

Nico

On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote:

> I believe there are some performance impact while de/serializing,
> which is "normal". What I'm trying to understand is if there are any
> tips to improve this process. For instance, tuples vs general class
> types. Do you know if it's worth it to map a custom object into tuple
> just for de/serialization process?
>
> According to jfr analysis, kryo methods are hit a lot.
>
> [cid:image003.jpg@01D2E9E1.26D2D370]
>
>
>
>
>
>
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: 20 de junho de 2017 16:04
> To: [hidden email]
> Cc: Nuno Rafael Goncalves <[hidden email]>
> Subject: Re: Kafka and Flink integration
>
>
>
> No, this is only necessary if you want to register a custom serializer
> itself [1]. Also, in case you are wondering about registerKryoType() -
> this is only needed as a performance optimisation.
>
>
>
> What exactly is your problem? What are you trying to solve?
>
> (I can't read JFR files here, and from what I read at Oracle's site,
> this requires a commercial license, too...)
>
>
>
>
>
> Nico
>
>
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
>
> custom_serializers.html
>
> On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:
> > Do I need to use registerTypeWithKryoSerializer() in my execution
> >
> > environment?
> >
> > My serialization into kafka is done with the following snippet
> >
> >
> >
> > try (ByteArrayOutputStream byteArrayOutStream = new
> > ByteArrayOutputStream();
> >
> > Output output = new Output(byteArrayOutStream)) {
> >
> >       Kryo kryo = new Kryo();
> >      
> >       kryo.writeClassAndObject(output, event);
> >      
> >       output.flush();
> >      
> >       return byteArrayOutStream.toByteArray();
> >    
> >     } catch (IOException e) {
> >    
> >       return null;
> >    
> >     }
> >
> > "event" is my custom object.
> >
> >
> >
> > then i desirialize it in flink's kafka consumer
> >
> > try (ByteArrayInputStream byteArrayInStream = new
> >
> > ByteArrayInputStream(bytes); Input input = new
> > Input(byteArrayInStream,
> >
> > bytes.length)) {
> >
> >       Kryo kryo = new Kryo();
> >      
> >       return kryo.readClassAndObject(input);
> >    
> >     } catch (IOException e) {
> >    
> >       return null;
> >    
> >     }
> >
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > View this message in context:
> >
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > Kafka-> > a
> >
> > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink
> > User
> >
> > Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

nragon
In reply to this post by Tzu-Li Tai
Can i have pojo has composition of other pojo?
My custom object has many dependencies and in order to refactor it I must also change another 5 classes as well.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

Tzu-Li (Gordon) Tai
Yes, POJOs can contain other nested POJO types. You just have to make sure that the nested field is either public, or has a corresponding public getter- and setter- method that follows the Java beans naming conventions.


On 21 June 2017 at 12:20:31 AM, nragon ([hidden email]) wrote:

Can i have pojo has composition of other pojo?
My custom object has many dependencies and in order to refactor it I must
also change another 5 classes as well.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

nragon
Thanks, I'll try to refactor into POJOs.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RE: Kafka and Flink integration

nragon
In reply to this post by Tzu-Li (Gordon) Tai
Just one more question :).
Considering I'm producing into kafka with other application other than flink, which serializer should i use in order to use pojo types when consuming those same messages (now in flink)?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Stephan Ewen
Hi!

For general data exchange between systems, it is often good to have a more standard format.
Being able to evolve the schema of types is very helpful if you evolve the data pipeline (which almost always happens eventually).

For that reason, Avro and Thrift are very popular for that type of data exchange.

While they are not as fast as Kryo, they are more "robust" in the sense that the format is stable.
Kryo is a good choice for intermediate data that is not persistent or at least not leaving one specific system.

Greetings,
Stephan



On Tue, Jun 20, 2017 at 7:22 PM, nragon <[hidden email]> wrote:
Just one more question :).
Considering I'm producing into kafka with other application other than
flink, which serializer should i use in order to use pojo types when
consuming those same messages (now in flink)?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13882.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

nragon
This post was updated on .
So,
 1 - serialization between producer application -> kafka -> flink kafka consumer will use avro, thrift or kryo right?
 2 - Should I use AbstractDeserializationSchema or TypeExtractor.getForClass(<MyCustomerObject>.class) in my DeserializationSchema?
 3 - Remaining pipeline can just use standard pojo serialization, which would be better?
 4 - How can i monitor which serialization mechnism flink is using? How can i force not to use kryo or avro during job runtime?
 5 - Should I always define TypeInformation, for instance, using factory?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Greg Hogan
The recommendation has been to avoid Kryo where possible.

General data exchange: avro or thrift.

Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap).

Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known.



> On Jun 21, 2017, at 3:19 AM, nragon <[hidden email]> wrote:
>
> So, serialization between producer application -> kafka -> flink kafka
> consumer will use avro, thrift or kryo right? From there, the remaining
> pipeline can just use standard pojo serialization, which would be better?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Ted Yu
In reply to this post by nragon
Greg:
Can you clarify he last part?
Should it be: the concrete type cannot be known ?

-------- Original message --------
From: Greg Hogan <[hidden email]>
Date: 6/21/17 3:10 AM (GMT-08:00)
To: nragon <[hidden email]>
Subject: Re: Kafka and Flink integration

The recommendation has been to avoid Kryo where possible.

General data exchange: avro or thrift.

Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap).

Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known.



> On Jun 21, 2017, at 3:19 AM, nragon <[hidden email]> wrote:
>
> So, serialization between producer application -> kafka -> flink kafka
> consumer will use avro, thrift or kryo right? From there, the remaining
> pipeline can just use standard pojo serialization, which would be better?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Greg Hogan
If the concrete type cannot be known then a proper TypeInformation cannot be created and Kryo must be used.

There may be a few cases where the TypeInformation can be deduced by the developer but not by TypeExtractor and the returns TypeInformation must be explicitly given to prevent the use of Kryo.

A recent example in Gelly was a Function with input and output types the same generic interface bound to different parameters. The implementation outputs the same concrete class as the input, but this programmatic structure cannot be deduced by the TypeExtractor so a returns TypeInformation was specified.

Greg


> On Jun 21, 2017, at 6:21 AM, Ted Yu <[hidden email]> wrote:
>
> Greg:
> Can you clarify he last part?
> Should it be: the concrete type cannot be known ?
>
> -------- Original message --------
> From: Greg Hogan <[hidden email]>
> Date: 6/21/17 3:10 AM (GMT-08:00)
> To: nragon <[hidden email]>
> Cc: [hidden email]
> Subject: Re: Kafka and Flink integration
>
> The recommendation has been to avoid Kryo where possible.
>
> General data exchange: avro or thrift.
>
> Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap).
>
> Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known.
>
>
>
> > On Jun 21, 2017, at 3:19 AM, nragon <[hidden email]> wrote:
> >
> > So, serialization between producer application -> kafka -> flink kafka
> > consumer will use avro, thrift or kryo right? From there, the remaining
> > pipeline can just use standard pojo serialization, which would be better?
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Kafka and Flink integration

Urs Schoenenberger
In reply to this post by Greg Hogan
Hi Greg,

do you have a link where I could read up on the rationale behind
avoiding Kryo? I'm currently facing a similar decision and would like to
get some more background on this.

Thank you very much,
Urs

On 21.06.2017 12:10, Greg Hogan wrote:

> The recommendation has been to avoid Kryo where possible.
>
> General data exchange: avro or thrift.
>
> Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap).
>
> Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known.
>
>
>
>> On Jun 21, 2017, at 3:19 AM, nragon <[hidden email]> wrote:
>>
>> So, serialization between producer application -> kafka -> flink kafka
>> consumer will use avro, thrift or kryo right? From there, the remaining
>> pipeline can just use standard pojo serialization, which would be better?
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

--
Urs Schönenberger - [hidden email]

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
12
Loading...