Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Congxian Qiu
Hi

From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue?

Best,
Congxian


Salva Alcántara <[hidden email]> 于2019年12月1日周日 下午3:01写道:
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Yun Tang

Hi Salva

 

The root cause is that you did not figure out the difference between keyed state and operator state.

 

There is no ‘currentKey’ in operator state, which means PartitionableListState#clear() will clear the whole state. However, there is always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only remove the entry scoped to current runtime key. In your example code, the state to clear is a MapState (not a list state) and therefore must be a keyed state. If your job did not process any record, there would no ‘currentKey’ to be set [1] for that ‘modelsBytes’ state which lead to the NPE when calling ‘state#clear()’.

 

Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state.

 

Last but not least, even you could ensure at least one record processed before calling ‘snapshotState’, it’s not clear for your program logic. You cannot control well which entry in you state would be cleared since you cannot control the current key which set via processing record.

 

You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state could be cleared during snapshotStaet.

 

[1] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136

[2] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324

 

Best

Yun Tang

 

 

From: Congxian Qiu <[hidden email]>
Date: Monday, December 2, 2019 at 10:41 AM
To: Salva Alcántara <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

 

Hi

 

From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue?


Best,

Congxian

 

 

Salva Alcántara <[hidden email]> 2019121日周日 下午3:01写道:

Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error:

`java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
In reply to this post by Congxian Qiu
Thanks Congxian. From what I've read, it seems that using the keyed state in
`snapshotState` is incorrect...what confuses me is that if I do something
like this

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    if (models.nonEmpty) {
      modelsBytes.clear() // This raises an exception when there is no
active key set
      for ((k, model) <- models) {
        modelsBytes.put(k, model.toBytes(v))
      }
    }
  }
```

Then, when there is data (`models` is populated within `processElement1`),
the `clear` and subsequent calls to `put` work just fine. This seems like a
bug to me, as others have pointed out in this somehow extended question
posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models

Do you think the fact that `clear` works within `snapshotState` under
certain circumstances is indeed a bug?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara
In reply to this post by Yun Tang
Hi Yun,

Thanks for your reply. You mention that

" ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
snapshot and initialize for operator state"

but..."mainly" is not "exclusively" right? So, I guess my question tries to
figure out whether doing something like this is valid/makes sense?

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    if (models.nonEmpty) {
      modelsBytes.clear()
      for ((k, model) <- models) {
        modelsBytes.put(k, model.toBytes(v))
      }
    }
  }
```

Indeed, the above code seems to work well ... so it seems like a bug that
`clear` works sometimes but sometimes not as I noted in my reply to Congxian
and others have noted in this extended question posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Yun Tang
Hi Salva

As I pointed out, it's not clear for your program logic if you call 'state.clear()' within 'snapshotState' as you do not know what exact current key is. Hence, I think your idea like that does not make any sense.

From my point of view, 'clear' works sometimes in your code is not a bug at current Flink framework. Currently, we would set currentKey when processing a record. However, Flink does not need to reset current key to null since there is no such life cycle for setting current key now. There seems no any benefit if introducing this, and might cause performance regression as we need more steps here.

Best
Yun Tang


On 12/2/19, 9:29 PM, "Salva Alcántara" <[hidden email]> wrote:

    Hi Yun,
   
    Thanks for your reply. You mention that
   
    " ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
    snapshot and initialize for operator state"
   
    but..."mainly" is not "exclusively" right? So, I guess my question tries to
    figure out whether doing something like this is valid/makes sense?
   
    ```
      override def snapshotState(context: FunctionSnapshotContext): Unit = {
        if (models.nonEmpty) {
          modelsBytes.clear()
          for ((k, model) <- models) {
            modelsBytes.put(k, model.toBytes(v))
          }
        }
      }
    ```
   
    Indeed, the above code seems to work well ... so it seems like a bug that
    `clear` works sometimes but sometimes not as I noted in my reply to Congxian
    and others have noted in this extended question posted in stackoverflow:
   
    https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models
   
   
   
    --
    Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
   

Reply | Threaded
Open this post in threaded view
|

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

Salva Alcántara