Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Andrea Spina
Good afternoon dear Community,

Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace.

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 11 more
2017-06-07 10:18:52,594 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]
2017-06-07 10:18:52,841 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.get(ArrayList.java:429)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 11 more


What I'm doing basically is a product between matrices: I load the matrices COO formatted; the Block class is the following (really much inspired to this https://issues.apache.org/jira/browse/FLINK-3920).


import breeze.linalg.{Matrix => BreezeMatrix}
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.{Matrix, SparseMatrix}

class Block(val blockData: Matrix) extends MatrixLayout with Serializable {

  def data: Matrix = blockData

  def toBreeze: BreezeMatrix[Double] = blockData.asBreeze

  def numRows: Int = data.numRows

  def numCols: Int = data.numCols

  def *(other: Block): Block = {

    require(this.numCols == other.numRows)

    Block((blockData.asBreeze * other.toBreeze).fromBreeze)
  }

  def +(other: Block): Block =
    Block((blockData.asBreeze + other.toBreeze).fromBreeze)

  def unary_+(other: Block): Block = this + other

  override def equals(other: Any): Boolean = {
    other match {
      case block: Block => this.blockData.equalsMatrix(block.blockData)
      case _ => false
    }
  }

}

The block matrix is a matrix of blocks, the implicated group reduce function it's the last step of the product function.

class SumGroupOfBlocks(blockMapper: BlockMapper)
  extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), (BlockID, Block)] {

  override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, Int, Block))], out: Collector[(BlockID, Block)])
    : Unit = {

    val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {
      case ((i, j, left), (x, y, right)) => (i, y, left * right)
    }.toSeq

    val reducedGroup = multipliedGroup.reduce((left, right) => {
      val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)

      (i, j, leftBlock + rightBlock)
    })

    out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, reducedGroup._2), reducedGroup._3)
  }
}

The above described exception happens when I try to increase the matrices sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 matrices, but not with 2000x2000 matrices and above.

I think it worths to mention also that the IndexOutOfBoundsException is always seeking for index 109 (on different matrices sizes) and the size of the Array is changing in a range (5-7). It looks like somehow the serialized message are truncated right before their delivery.

I tried to follow several solutions, not in order what has not been worked:

- employing flink-1.2.0, flink-1.3.0
- updating flink kryo library to 3.0.3
- running on parallelism 1
- explicitly register my custom classes to Kryo
- varying the size of my blocks
- trying to increase akka.framesize

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 6GB task manager heap size.
16384 numOfBuffers and 16384 networkBufferSize.

If I run the code on my laptop on 2000x2000 matrices, it works, likely due to jumping off remote serialization.

I really hope someone could help here. It's becoming really painful...

Thank you so much.

Cheers, Andrea
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Tzu-Li (Gordon) Tai
Hi Andrea,

I did some quick issue searching, and it seems like this is a frequently asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.

I can’t be sure at the moment if the resolution / workaround mentioned in there makes sense, I’ll have to investigate a bit more.

Also, to clarify: from the stack trace, it seems like you’re simply using whatever serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that correct?

In the meanwhile, could you also try the following and rebuild Flink, and test to see if it works?:

Cheers,
Gordon


On 7 June 2017 at 3:39:55 PM, Andrea Spina ([hidden email]) wrote:

Good afternoon dear Community,

Since few days I'm really struggling to understand the reason behind this
KryoException. Here the stack trace.

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code: CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat
ion$.main(MatrixMultiplication.scala:46)) (1/1)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B
lockMatrix.scala:103)) -> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: E
rror obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOu
tOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more
2017-06-07 10:18:52,594 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Memory usage
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)]
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Direct
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Off-heap
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace:
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB
(used/committed/max)]
2017-06-07 10:18:52,766 INFO
org.apache.flink.runtime.taskmanager.TaskManager - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97],
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1]
2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task
- CHAIN GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
(GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception:
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
109, Size: 5
Serialization trace:
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 11 more


What I'm doing basically is a product between matrices: I load the matrices
COO formatted; the Block class is the following (really much inspired to
this https://issues.apache.org/jira/browse/FLINK-3920).


import breeze.linalg.{Matrix => BreezeMatrix}
import org.apache.flink.ml.math.Breeze._
import org.apache.flink.ml.math.{Matrix, SparseMatrix}

class Block(val blockData: Matrix) extends MatrixLayout with Serializable {

def data: Matrix = blockData

def toBreeze: BreezeMatrix[Double] = blockData.asBreeze

def numRows: Int = data.numRows

def numCols: Int = data.numCols

def *(other: Block): Block = {

require(this.numCols == other.numRows)

Block((blockData.asBreeze * other.toBreeze).fromBreeze)
}

def +(other: Block): Block =
Block((blockData.asBreeze + other.toBreeze).fromBreeze)

def unary_+(other: Block): Block = this + other

override def equals(other: Any): Boolean = {
other match {
case block: Block => this.blockData.equalsMatrix(block.blockData)
case _ => false
}
}

}

The block matrix is a matrix of blocks, the implicated group reduce function
it's the last step of the product function.

class SumGroupOfBlocks(blockMapper: BlockMapper)
extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)),
(BlockID, Block)] {

override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int,
Int, Block))], out: Collector[(BlockID, Block)])
: Unit = {

val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect {
case ((i, j, left), (x, y, right)) => (i, y, left * right)
}.toSeq

val reducedGroup = multipliedGroup.reduce((left, right) => {
val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right)

(i, j, leftBlock + rightBlock)
})

out.collect(blockMapper.blockIdFromCoo(reducedGroup._1,
reducedGroup._2), reducedGroup._3)
}
}

The above described exception happens when I try to increase the matrices
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000
matrices, but not with 2000x2000 matrices and above.

I think it worths to mention also that the IndexOutOfBoundsException is
always seeking for index 109 (on different matrices sizes) and the size of
the Array is changing in a range (5-7). It looks like somehow the serialized
message are truncated right before their delivery.

I tried to follow several solutions, not in order what has not been worked:

- employing flink-1.2.0, flink-1.3.0
- updating flink kryo library to 3.0.3
- running on parallelism 1
- explicitly register my custom classes to Kryo
- varying the size of my blocks
- trying to increase akka.framesize

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM.
6GB task manager heap size.
16384 numOfBuffers and 16384 networkBufferSize.

If I run the code on my laptop on 2000x2000 matrices, it works, likely due
to jumping off remote serialization.

I really hope someone could help here. It's becoming really painful...

Thank you so much.

Cheers, Andrea



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Aljoscha Krettek
@Flavio, doesn’t this look like the exception you often encountered a while back? If I remember correctly that was fixed by Kurt, right?

Best,
Aljoscha

On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi Andrea,

I did some quick issue searching, and it seems like this is a frequently asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.

I can’t be sure at the moment if the resolution / workaround mentioned in there makes sense, I’ll have to investigate a bit more.

Also, to clarify: from the stack trace, it seems like you’re simply using whatever serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that correct?

In the meanwhile, could you also try the following and rebuild Flink, and test to see if it works?:

Cheers,
Gordon


On 7 June 2017 at 3:39:55 PM, Andrea Spina ([hidden email]) wrote:

Good afternoon dear Community, 

Since few days I'm really struggling to understand the reason behind this 
KryoException. Here the stack trace. 

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code: CHAIN GroupReduce (GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat 
ion$.main(MatrixMultiplication.scala:46)) (1/1) 
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B 
lockMatrix.scala:103)) -> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' 
, caused an error: E 
rror obtaining the sorted input: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
Thread 'SortMerger spilling thread' terminated due to an exception: 
java.lang.IndexOu 
tOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) 
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
... 3 more 
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) 
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
at java.util.ArrayList.get(ArrayList.java:429) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
... 11 more 
2017-06-07 10:18:52,594 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Memory usage 
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Direct 
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Off-heap 
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB 
(used/committed/max)] 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Garbage 
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], 
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 
2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task  
- CHAIN GroupReduce (GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) 
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. 
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' 
, caused an error: Error obtaining the sorted input: Thread 'SortMerger 
spilling thread' terminated due to an exception: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
Thread 'SortMerger spilling thread' terminated due to an exception: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) 
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
... 3 more 
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
at java.util.ArrayList.get(ArrayList.java:429) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
... 11 more 


What I'm doing basically is a product between matrices: I load the matrices 
COO formatted; the Block class is the following (really much inspired to 
this https://issues.apache.org/jira/browse/FLINK-3920). 


import breeze.linalg.{Matrix => BreezeMatrix} 
import org.apache.flink.ml.math.Breeze._ 
import org.apache.flink.ml.math.{Matrix, SparseMatrix} 

class Block(val blockData: Matrix) extends MatrixLayout with Serializable { 

def data: Matrix = blockData 

def toBreeze: BreezeMatrix[Double] = blockData.asBreeze 

def numRows: Int = data.numRows 

def numCols: Int = data.numCols 

def *(other: Block): Block = { 

require(this.numCols == other.numRows) 

Block((blockData.asBreeze * other.toBreeze).fromBreeze) 
} 

def +(other: Block): Block = 
Block((blockData.asBreeze + other.toBreeze).fromBreeze) 

def unary_+(other: Block): Block = this + other 

override def equals(other: Any): Boolean = { 
other match { 
case block: Block => this.blockData.equalsMatrix(block.blockData) 
case _ => false 
} 
} 

} 

The block matrix is a matrix of blocks, the implicated group reduce function 
it's the last step of the product function. 

class SumGroupOfBlocks(blockMapper: BlockMapper) 
extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), 
(BlockID, Block)] { 

override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, 
Int, Block))], out: Collector[(BlockID, Block)]) 
: Unit = { 

val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { 
case ((i, j, left), (x, y, right)) => (i, y, left * right) 
}.toSeq 

val reducedGroup = multipliedGroup.reduce((left, right) => { 
val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) 

(i, j, leftBlock + rightBlock) 
}) 

out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, 
reducedGroup._2), reducedGroup._3) 
} 
} 

The above described exception happens when I try to increase the matrices 
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 
matrices, but not with 2000x2000 matrices and above. 

I think it worths to mention also that the IndexOutOfBoundsException is 
always seeking for index 109 (on different matrices sizes) and the size of 
the Array is changing in a range (5-7). It looks like somehow the serialized 
message are truncated right before their delivery. 

I tried to follow several solutions, not in order what has not been worked: 

- employing flink-1.2.0, flink-1.3.0 
- updating flink kryo library to 3.0.3 
- running on parallelism 1 
- explicitly register my custom classes to Kryo 
- varying the size of my blocks 
- trying to increase akka.framesize 

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 
6GB task manager heap size. 
16384 numOfBuffers and 16384 networkBufferSize. 

If I run the code on my laptop on 2000x2000 matrices, it works, likely due 
to jumping off remote serialization. 

I really hope someone could help here. It's becoming really painful... 

Thank you so much. 

Cheers, Andrea 



-- 
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html 
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Flavio Pompermaier
Yes it looks very similar to the Exception I experienced (https://issues.apache.org/jira/browse/FLINK-6398) but my error was more related to Row serialization/deserialization (see [1]) while this looks more like something related to Kryo. However also with Flink 1.3.0 the error seems to appear from what I understood so it can't be the same problem.

Once I had a very similar problems (see [2] and [3]) and I was able to avoid the problem removing the reuse of input and output within KryoSerializer (as discuss in [3])/
I hope that could help..

Best,
Flavio

On Thu, Jun 8, 2017 at 11:39 AM, Aljoscha Krettek <[hidden email]> wrote:
@Flavio, doesn’t this look like the exception you often encountered a while back? If I remember correctly that was fixed by Kurt, right?

Best,
Aljoscha

On 7. Jun 2017, at 18:11, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi Andrea,

I did some quick issue searching, and it seems like this is a frequently asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428.

I can’t be sure at the moment if the resolution / workaround mentioned in there makes sense, I’ll have to investigate a bit more.

Also, to clarify: from the stack trace, it seems like you’re simply using whatever serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that correct?

In the meanwhile, could you also try the following and rebuild Flink, and test to see if it works?:

Cheers,
Gordon


On 7 June 2017 at 3:39:55 PM, Andrea Spina ([hidden email]) wrote:

Good afternoon dear Community, 

Since few days I'm really struggling to understand the reason behind this 
KryoException. Here the stack trace. 

2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code: CHAIN GroupReduce (GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat 
ion$.main(MatrixMultiplication.scala:46)) (1/1) 
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B 
lockMatrix.scala:103)) -> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' 
, caused an error: E 
rror obtaining the sorted input: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
Thread 'SortMerger spilling thread' terminated due to an exception: 
java.lang.IndexOu 
tOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) 
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
... 3 more 
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) 
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
at java.util.ArrayList.get(ArrayList.java:429) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
... 11 more 
2017-06-07 10:18:52,594 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Memory usage 
stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Direct 
memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Off-heap 
pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 
57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB 
(used/committed/max)] 
2017-06-07 10:18:52,766 INFO  
org.apache.flink.runtime.taskmanager.TaskManager - Garbage 
collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], 
[G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 
2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task  
- CHAIN GroupReduce (GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) 
(1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. 
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at 
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) 
-> Map (Map at 
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' 
, caused an error: Error obtaining the sorted input: Thread 'SortMerger 
spilling thread' terminated due to an exception: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) 
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
Thread 'SortMerger spilling thread' terminated due to an exception: 
java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) 
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) 
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) 
at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) 
... 3 more 
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 
109, Size: 5 
Serialization trace: 
blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) 
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) 
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 
Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 
at java.util.ArrayList.rangeCheck(ArrayList.java:653) 
at java.util.ArrayList.get(ArrayList.java:429) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) 
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) 
... 11 more 


What I'm doing basically is a product between matrices: I load the matrices 
COO formatted; the Block class is the following (really much inspired to 
this https://issues.apache.org/jira/browse/FLINK-3920). 


import breeze.linalg.{Matrix => BreezeMatrix} 
import org.apache.flink.ml.math.Breeze._ 
import org.apache.flink.ml.math.{Matrix, SparseMatrix} 

class Block(val blockData: Matrix) extends MatrixLayout with Serializable { 

def data: Matrix = blockData 

def toBreeze: BreezeMatrix[Double] = blockData.asBreeze 

def numRows: Int = data.numRows 

def numCols: Int = data.numCols 

def *(other: Block): Block = { 

require(this.numCols == other.numRows) 

Block((blockData.asBreeze * other.toBreeze).fromBreeze) 
} 

def +(other: Block): Block = 
Block((blockData.asBreeze + other.toBreeze).fromBreeze) 

def unary_+(other: Block): Block = this + other 

override def equals(other: Any): Boolean = { 
other match { 
case block: Block => this.blockData.equalsMatrix(block.blockData) 
case _ => false 
} 
} 

} 

The block matrix is a matrix of blocks, the implicated group reduce function 
it's the last step of the product function. 

class SumGroupOfBlocks(blockMapper: BlockMapper) 
extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), 
(BlockID, Block)] { 

override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, 
Int, Block))], out: Collector[(BlockID, Block)]) 
: Unit = { 

val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { 
case ((i, j, left), (x, y, right)) => (i, y, left * right) 
}.toSeq 

val reducedGroup = multipliedGroup.reduce((left, right) => { 
val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) 

(i, j, leftBlock + rightBlock) 
}) 

out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, 
reducedGroup._2), reducedGroup._3) 
} 
} 

The above described exception happens when I try to increase the matrices 
sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 
matrices, but not with 2000x2000 matrices and above. 

I think it worths to mention also that the IndexOutOfBoundsException is 
always seeking for index 109 (on different matrices sizes) and the size of 
the Array is changing in a range (5-7). It looks like somehow the serialized 
message are truncated right before their delivery. 

I tried to follow several solutions, not in order what has not been worked: 

- employing flink-1.2.0, flink-1.3.0 
- updating flink kryo library to 3.0.3 
- running on parallelism 1 
- explicitly register my custom classes to Kryo 
- varying the size of my blocks 
- trying to increase akka.framesize 

I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 
6GB task manager heap size. 
16384 numOfBuffers and 16384 networkBufferSize. 

If I run the code on my laptop on 2000x2000 matrices, it works, likely due 
to jumping off remote serialization. 

I really hope someone could help here. It's becoming really painful... 

Thank you so much. 

Cheers, Andrea 



-- 
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558.html 
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Andrea Spina
Hi guys,

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0 versions.
Following Gordon suggestion I tried to put setReference to false but sadly it didn't help. What I did then was to declare a custom serializer as the following:

class BlockSerializer extends Serializer[Block] with Serializable {

    override def read(kryo: Kryo, input: Input, block: Class[Block]): Block = {
      val serializer = new SparseMatrixSerializer

      val blockData = kryo.readObject(input, classOf[SparseMatrix], serializer)
      new Block(blockData)
    }

    override def write(kryo: Kryo, output: Output, block: Block): Unit = {
      val serializer = new SparseMatrixSerializer

      kryo.register(classOf[SparseMatrix], serializer)
      kryo.writeObject(output, block.blockData, serializer)

      output.close()
    }

  }

  class SparseMatrixSerializer extends Serializer[SparseMatrix] with Serializable {

    override def read(kryo: Kryo, input: Input, sparse: Class[SparseMatrix]): SparseMatrix = {
      val collectionIntSerializer = new CollectionSerializer()
      collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer)
      val collectionDoubleSerializer = new CollectionSerializer()
      collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer)

      val numRows = input.readInt
      val numCols = input.readInt
      val colPtrs = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
      val rowIndices = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
      val data = kryo.readObject(input, classOf[java.util.ArrayList[Double]], collectionDoubleSerializer).asScala.toArray

      input.close()

      new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs = colPtrs, rowIndices = rowIndices, data = data)
    }

    override def write(kryo: Kryo, output: Output, sparseMatrix: SparseMatrix): Unit = {

      val collectionIntSerializer = new CollectionSerializer()
      collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer)

      val collectionDoubleSerializer = new CollectionSerializer()
      collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer)

      kryo.register(classOf[java.util.ArrayList[Int]], collectionIntSerializer)
      kryo.register(classOf[java.util.ArrayList[Double]], collectionDoubleSerializer)

      output.writeInt(sparseMatrix.numRows)
      output.writeInt(sparseMatrix.numCols)
      kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava, collectionIntSerializer)
      kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava, collectionIntSerializer)
      kryo.writeObject(output, sparseMatrix.data.toList.asJava, collectionDoubleSerializer)

      output.close()
    }

  }

  What I obtained is the same previous exception but on different accessed index and size.

  Caused by: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:653)
        at java.util.ArrayList.set(ArrayList.java:444)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680)
        at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85)
        at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

Does it might help somehow?

Thank you again,

Andrea
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Tzu-Li (Gordon) Tai
Hi Andrea,

I’ve rallied back to this and wanted to check on the status. Have you managed to solve this in the end, or is this still a problem for you?

If it’s still a problem, would you be able to provide a complete runnable example job that can reproduce the problem (ideally via a git branch I can clone and run :))?
This would help me with digging a bit more into the issue. Thanks a lot!

Best,
Gordon


On 8 June 2017 at 6:58:46 PM, Andrea Spina ([hidden email]) wrote:

Hi guys,

thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0
versions.
Following Gordon suggestion I tried to put setReference to false but sadly
it didn't help. What I did then was to declare a custom serializer as the
following:

class BlockSerializer extends Serializer[Block] with Serializable {

override def read(kryo: Kryo, input: Input, block: Class[Block]): Block
= {
val serializer = new SparseMatrixSerializer

val blockData = kryo.readObject(input, classOf[SparseMatrix],
serializer)
new Block(blockData)
}

override def write(kryo: Kryo, output: Output, block: Block): Unit = {
val serializer = new SparseMatrixSerializer

kryo.register(classOf[SparseMatrix], serializer)
kryo.writeObject(output, block.blockData, serializer)

output.close()
}

}

class SparseMatrixSerializer extends Serializer[SparseMatrix] with
Serializable {

override def read(kryo: Kryo, input: Input, sparse:
Class[SparseMatrix]): SparseMatrix = {
val collectionIntSerializer = new CollectionSerializer()
collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)
val collectionDoubleSerializer = new CollectionSerializer()
collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

val numRows = input.readInt
val numCols = input.readInt
val colPtrs = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
val rowIndices = kryo.readObject(input,
classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray
val data = kryo.readObject(input,
classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer).asScala.toArray

input.close()

new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs =
colPtrs, rowIndices = rowIndices, data = data)
}

override def write(kryo: Kryo, output: Output, sparseMatrix:
SparseMatrix): Unit = {

val collectionIntSerializer = new CollectionSerializer()
collectionIntSerializer.setElementClass(classOf[Int], new
IntSerializer)

val collectionDoubleSerializer = new CollectionSerializer()
collectionDoubleSerializer.setElementClass(classOf[Double], new
DoubleSerializer)

kryo.register(classOf[java.util.ArrayList[Int]],
collectionIntSerializer)
kryo.register(classOf[java.util.ArrayList[Double]],
collectionDoubleSerializer)

output.writeInt(sparseMatrix.numRows)
output.writeInt(sparseMatrix.numCols)
kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava,
collectionIntSerializer)
kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava,
collectionIntSerializer)
kryo.writeObject(output, sparseMatrix.data.toList.asJava,
collectionDoubleSerializer)

output.close()
}

}

What I obtained is the same previous exception but on different accessed
index and size.

Caused by: java.lang.Exception: The data preparation for task 'CHAIN
GroupReduce (GroupReduce at
my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103))
-> Map (Map at
my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Index: 1, Size: 0
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1,
Size: 0
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Index: 1, Size: 0
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.set(ArrayList.java:444)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680)
at
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85)
at
my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

Does it might help somehow?

Thank you again,

Andrea



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13596.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Andrea Spina
I Gordon, sadly no news since the last message.

At the end I jumped over the issue, I was not able to solve it. I'll try provide a runnable example asap.

Thank you.

Andrea
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Painful KryoException: java.lang.IndexOutOfBoundsException on Flink Batch Api scala

Tzu-Li (Gordon) Tai
Thanks a lot Andrea!


On 21 June 2017 at 8:36:32 PM, Andrea Spina ([hidden email]) wrote:

I Gordon, sadly no news since the last message.

At the end I jumped over the issue, I was not able to solve it. I'll try
provide a runnable example asap.

Thank you.

Andrea



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Painful-KryoException-java-lang-IndexOutOfBoundsException-on-Flink-Batch-Api-scala-tp13558p13896.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Loading...