Kafka producer failed with InvalidTxnStateException when performing commit transaction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Fabian Hueske-2
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei