Could not extract key Exception only on runtime not in dev environment

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Could not extract key Exception only on runtime not in dev environment

Avi Levi
I am running flink locally on my machine , I am getting the exception below when reading from kafka topic. when running from the ide (intellij) it is running perfectly. however when I deploy my jar to flink runtime (locally) using 
/bin/flink run ~MyApp-1.0-SNAPSHOT.jar
my class looks like this 
case class Foo(id: String, value: String, timestamp: Long, counter: Int) 
I am getting this exception 
java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 22 more
Caused by: java.lang.NullPointerException
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:41)
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:40)
	at org.apache.flink.streaming.api.scala.DataStream$$anon$2.getKey(DataStream.scala:411)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
	... 26 more

my key partition is simple (partitionFactor = some number) 
.keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
again, this happens only on runtime not when I run it from intellij 
this so frustrating, any advice ?

Reply | Threaded
Open this post in threaded view
|

Re: Could not extract key Exception only on runtime not in dev environment

miki haiat
What r.id  Value ? 
Are you sure that is not null ?

Miki.


On Tue, 20 Nov 2018, 17:26 Avi Levi <[hidden email] wrote:
I am running flink locally on my machine , I am getting the exception below when reading from kafka topic. when running from the ide (intellij) it is running perfectly. however when I deploy my jar to flink runtime (locally) using 
/bin/flink run ~MyApp-1.0-SNAPSHOT.jar
my class looks like this 
case class Foo(id: String, value: String, timestamp: Long, counter: Int) 
I am getting this exception 
java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 22 more
Caused by: java.lang.NullPointerException
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:41)
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:40)
	at org.apache.flink.streaming.api.scala.DataStream$$anon$2.getKey(DataStream.scala:411)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
	... 26 more

my key partition is simple (partitionFactor = some number) 
.keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
again, this happens only on runtime not when I run it from intellij 
this so frustrating, any advice ?

Reply | Threaded
Open this post in threaded view
|

Re: Could not extract key Exception only on runtime not in dev environment

Avi Levi
yes you can also see it in the message (and also it should have crushed also on the ide) further more to be sure I added a filter that looks like 
env
    .addSource(kafka_source)
    .filter(_.id != null)
    .keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
    .map(...)

and still the same 

On Tue, Nov 20, 2018 at 5:31 PM miki haiat <[hidden email]> wrote:
What r.id  Value ? 
Are you sure that is not null ?

Miki.


On Tue, 20 Nov 2018, 17:26 Avi Levi <[hidden email] wrote:
I am running flink locally on my machine , I am getting the exception below when reading from kafka topic. when running from the ide (intellij) it is running perfectly. however when I deploy my jar to flink runtime (locally) using 
/bin/flink run ~MyApp-1.0-SNAPSHOT.jar
my class looks like this 
case class Foo(id: String, value: String, timestamp: Long, counter: Int) 
I am getting this exception 
java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not extract key from Foo("some-uuid","text",1540348398,1)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 22 more
Caused by: java.lang.NullPointerException
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:41)
	at com.bluevoyant.StreamingJob$$anonfun$3.apply(StreamingJob.scala:40)
	at org.apache.flink.streaming.api.scala.DataStream$$anon$2.getKey(DataStream.scala:411)
	at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
	... 26 more

my key partition is simple (partitionFactor = some number) 
.keyBy{ r =>
        val h = fastHash(r.id) % partitionFactor
        math.abs(h)
    }
again, this happens only on runtime not when I run it from intellij 
this so frustrating, any advice ?