problem with avro serialization

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

problem with avro serialization

Debasish Ghosh
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Tzu-Li (Gordon) Tai
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.
Reply | Threaded
Open this post in threaded view
|

Re: problem with avro serialization

Debasish Ghosh
Hi Gordon -

I have been trying out Flink 1.8 only recently. But this problem looks to to have existed since a long time. It's related to the way Flink handles Avro serialization, which I guess has not changed in recent times.

regards.

On Tue, May 14, 2019 at 2:22 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

Aljoscha opened a JIRA just recently for this issue: https://issues.apache.org/jira/browse/FLINK-12501.

Do you know if this is a regression from previous Flink versions?
I'm asking just to double check, since from my understanding of the issue, the problem should have already existed before.

Thanks,
Gordon

On Sun, May 12, 2019 at 3:53 PM Debasish Ghosh <[hidden email]> wrote:
Hello -

Facing an issue with avro serialization with Scala case classes generated through avrohugger ..
Scala case classes generated by avrohugger has the avro schema in the companion object. This is a sample generated class (details elided) ..

case class Data(var id: Int, var name: String) extends org.apache.avro.specific.SpecificRecordBase {
  def this() = this(0, "")
  def get(field$: Int): AnyRef = {
    //..
  }
  def put(field$: Int, value: Any): Unit = {
    //..
  }
  def getSchema(): org.apache.avro.Schema = Data.SCHEMA$
}
object Data {
  val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"pipelines.flink.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}")
}

Flink 1.8 avro serializer fails on this as Avro looks for a SCHEMA$ property in the class & is unable 2 use Java reflection 2 identify the SCHEMA$ in the companion object. The exception that I get is the following ..

java.lang.RuntimeException: Serializing the source elements failed: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class pipelines.flink.avro.Data

Any help or workaround will be appreciated ..

regards.


--