Flink (Local) Environment Thread Leaks?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink (Local) Environment Thread Leaks?

Theo Diefenthal
I included a Solr End2End test in my project, inheriting from Junit 4 SolrCloudTestCase.

The solr-test-framework for junit 4 makes use of com.carrotsearch.randomizedtesting which automatically tests for thread leakages on test end. In my other projects, that tool doesn't produce any problems.
When used in a test together with a Flink LocalExecutionEnvironment, it will prevent the test from suceeding due the following error at shutdown phase:

com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from SUITE scope at somepackage.E2ETest:
   1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)

Note that I can suppress the errors easily via setting @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want to point out possible thread leaks in the mailing list here. As the first thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink doesn't shut down some of its multitude of threads nicely in a local execution environment. My question: Is that some kind of problem / thread leakage in Flink or is it just a false warning?



Reply | Threaded
Open this post in threaded view
|

Re: Flink (Local) Environment Thread Leaks?

vino yang
Hi Theo,

If you think there is a thread leakage problem. You can create a JIRA issue and write a detailed description.

Ping [hidden email]  and [hidden email] to help to locate and analyze this problem?

Best,
Vino

Theo Diefenthal <[hidden email]> 于2019年11月14日周四 上午3:16写道:
I included a Solr End2End test in my project, inheriting from Junit 4 SolrCloudTestCase.

The solr-test-framework for junit 4 makes use of com.carrotsearch.randomizedtesting which automatically tests for thread leakages on test end. In my other projects, that tool doesn't produce any problems.
When used in a test together with a Flink LocalExecutionEnvironment, it will prevent the test from suceeding due the following error at shutdown phase:

com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from SUITE scope at somepackage.E2ETest:
   1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)

Note that I can suppress the errors easily via setting @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want to point out possible thread leaks in the mailing list here. As the first thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink doesn't shut down some of its multitude of threads nicely in a local execution environment. My question: Is that some kind of problem / thread leakage in Flink or is it just a false warning?



Reply | Threaded
Open this post in threaded view
|

Re: Flink (Local) Environment Thread Leaks?

Zili Chen
We found this issue previous.

In our case where leak thread comes from is tracked as https://issues.apache.org/jira/browse/FLINK-14565 

Best,
tison.


vino yang <[hidden email]> 于2019年11月14日周四 上午10:15写道:
Hi Theo,

If you think there is a thread leakage problem. You can create a JIRA issue and write a detailed description.

Ping [hidden email]  and [hidden email] to help to locate and analyze this problem?

Best,
Vino

Theo Diefenthal <[hidden email]> 于2019年11月14日周四 上午3:16写道:
I included a Solr End2End test in my project, inheriting from Junit 4 SolrCloudTestCase.

The solr-test-framework for junit 4 makes use of com.carrotsearch.randomizedtesting which automatically tests for thread leakages on test end. In my other projects, that tool doesn't produce any problems.
When used in a test together with a Flink LocalExecutionEnvironment, it will prevent the test from suceeding due the following error at shutdown phase:

com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from SUITE scope at somepackage.E2ETest:
   1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)

Note that I can suppress the errors easily via setting @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want to point out possible thread leaks in the mailing list here. As the first thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink doesn't shut down some of its multitude of threads nicely in a local execution environment. My question: Is that some kind of problem / thread leakage in Flink or is it just a false warning?



Reply | Threaded
Open this post in threaded view
|

Re: Flink (Local) Environment Thread Leaks?

Zili Chen
It is because MiniCluster start a SystemResourcesCounter for gathering metrics but no
logic for shutdown. Thus on cluster exist the thread leak.

Best,
tison.


tison <[hidden email]> 于2019年11月14日周四 上午10:21写道:
We found this issue previous.

In our case where leak thread comes from is tracked as https://issues.apache.org/jira/browse/FLINK-14565 

Best,
tison.


vino yang <[hidden email]> 于2019年11月14日周四 上午10:15写道:
Hi Theo,

If you think there is a thread leakage problem. You can create a JIRA issue and write a detailed description.

Ping [hidden email]  and [hidden email] to help to locate and analyze this problem?

Best,
Vino

Theo Diefenthal <[hidden email]> 于2019年11月14日周四 上午3:16写道:
I included a Solr End2End test in my project, inheriting from Junit 4 SolrCloudTestCase.

The solr-test-framework for junit 4 makes use of com.carrotsearch.randomizedtesting which automatically tests for thread leakages on test end. In my other projects, that tool doesn't produce any problems.
When used in a test together with a Flink LocalExecutionEnvironment, it will prevent the test from suceeding due the following error at shutdown phase:

com.carrotsearch.randomizedtesting.ThreadLeakError: 3 threads leaked from SUITE scope at somepackage.E2ETest:
   1) Thread[id=170, name=FlinkCompletableFutureDelayScheduler-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   2) Thread[id=29, name=metrics-meter-tick-thread-2, state=WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   3) Thread[id=28, name=metrics-meter-tick-thread-1, state=TIMED_WAITING, group=TGRP-E2ETest]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    at __randomizedtesting.SeedInfo.seed([CC6ED531AFECBAF6]:0)

Note that I can suppress the errors easily via setting @ThreadLeakScope(ThreadLeakScope.Scope.NONE) in my tests, but I just want to point out possible thread leaks in the mailing list here. As the first thread is named FlinkCompletableFutureDelayScheduler, I suggest that Flink doesn't shut down some of its multitude of threads nicely in a local execution environment. My question: Is that some kind of problem / thread leakage in Flink or is it just a false warning?