Kafka producer failed with InvalidTxnStateException when performing commit transaction

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Fabian Hueske-2
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Tony Wei
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

Becket Qin
Hi Tony,

From the symptom it is not quite clear to me what may cause this issue. Supposedly the TransactionCoordinator is independent of the active controller, so bouncing the active controller should not have special impact on the transactions (at least not every time). If this is stably reproducible, is it possible to turn on debug level logging on kafka.coordinator.transaction.TransactionCoordinator to see what does the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <[hidden email]> wrote:
Hi,

Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <[hidden email]> 於 2019年8月16日 週五 下午4:10寫道:
Hi Tony,

I'm sorry I cannot help you with this issue, but Becket (in CC) might have an idea what went wrong here.

Best, Fabian

Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <[hidden email]>:
Hi,

Currently, I was trying to update our kafka cluster with larger `transaction.max.timeout.ms`. The
original setting is kafka's default value (i.e. 15 minutes) and I tried to set as 3 hours.

When I was doing rolling-restart for my brokers, this exception came to me on the next checkpoint 
after I restarted the broker with active controller.

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state

I have no idea why it happened, and I didn't find any error log from brokers. Does anyone have
this exception before? How can I prevent from this exception when I tried to restart kafka cluster?
Does this exception mean that I will lost data in some of these transactions?

flink cluster version: 1.8.1
kafka cluster version: 1.0.1
flink kafka producer version: universal
producer transaction timeout: 15 minutes
checkpoint interval: 5 minutes
number of concurrent checkpoint: 1
max checkpoint duration before and after the exception occurred:  < 2 minutes

Best,
Tony Wei