Flink doesn't free YARN slots after restarting

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Flink doesn't free YARN slots after restarting

bowen.li
Hi guys,
    I was running a Flink job (12 parallelism) on an EMR cluster with 48 YARN slots. When the job starts, I can see from Flink UI that the job took 12 slots, and 36 slots were left available.

    I would expect that when the job fails, it would restart from checkpointing by taking another 12 slots and freeing the original 12 slots. Well, I observed that the job took new slots but never free original slots. The Flink job ended up killed by YARN because there's no available slots anymore.

     Here's the command I ran Flink job:

     ```
     flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
     ```

     Does anyone know what's going wrong?

Thanks,
Bowen
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink doesn't free YARN slots after restarting

Till Rohrmann
Hi Bowen,

if I'm not mistaken, then Flink's current Yarn implementation does not actively releases containers. The `YarnFlinkResourceManager` is started with a fixed number of containers it always tries to acquire. If a container should die, then it will request a new one.

In case of a failure all slots should be freed and then they should be subject to rescheduling the new tasks. Thus, it is not necessarily the case that 12 new slots will be used unless the old slots are no longer available (failure of a TM). Therefore, it sounds like a bug what you are describing. Could you share the logs with us?

Cheers,
Till

On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <[hidden email]> wrote:
Hi guys,
    I was running a Flink job (12 parallelism) on an EMR cluster with 48 YARN slots. When the job starts, I can see from Flink UI that the job took 12 slots, and 36 slots were left available.

    I would expect that when the job fails, it would restart from checkpointing by taking another 12 slots and freeing the original 12 slots. Well, I observed that the job took new slots but never free original slots. The Flink job ended up killed by YARN because there's no available slots anymore.

     Here's the command I ran Flink job:

     ```
     flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
     ```

     Does anyone know what's going wrong?

Thanks,
Bowen

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink doesn't free YARN slots after restarting

bowen.li
Hi Till,
    Thanks for taking this issue.

    We are not comfortable sending logs to a email list which is this open. I'll send logs to you.

Thanks,
Bowen


On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <[hidden email]> wrote:
Hi Bowen,

if I'm not mistaken, then Flink's current Yarn implementation does not actively releases containers. The `YarnFlinkResourceManager` is started with a fixed number of containers it always tries to acquire. If a container should die, then it will request a new one.

In case of a failure all slots should be freed and then they should be subject to rescheduling the new tasks. Thus, it is not necessarily the case that 12 new slots will be used unless the old slots are no longer available (failure of a TM). Therefore, it sounds like a bug what you are describing. Could you share the logs with us?

Cheers,
Till

On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <[hidden email]> wrote:
Hi guys,
    I was running a Flink job (12 parallelism) on an EMR cluster with 48 YARN slots. When the job starts, I can see from Flink UI that the job took 12 slots, and 36 slots were left available.

    I would expect that when the job fails, it would restart from checkpointing by taking another 12 slots and freeing the original 12 slots. Well, I observed that the job took new slots but never free original slots. The Flink job ended up killed by YARN because there's no available slots anymore.

     Here's the command I ran Flink job:

     ```
     flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
     ```

     Does anyone know what's going wrong?

Thanks,
Bowen


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Flink doesn't free YARN slots after restarting

bowen.li
Hi Till,
    Any idea why it happened? I've tried different configurations for configuring our Flink cluster, but the cluster always fails after 4 or 5 hours.

    According to the log, looks like the total number of slots becomes 0 at the end, and YarnClusterClient shuts down application master as a result. Why the slots are not released? Or are they actually crushed and thus no longer available?

I'm trying to deploy the first Flink cluster within out company. And this issue is slowing us down from proving that Flink actually works for us. We'd appreciate your help on it!

Thanks,
Bowen

On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <[hidden email]> wrote:
Hi Till,
    Thanks for taking this issue.

    We are not comfortable sending logs to a email list which is this open. I'll send logs to you.

Thanks,
Bowen


On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <[hidden email]> wrote:
Hi Bowen,

if I'm not mistaken, then Flink's current Yarn implementation does not actively releases containers. The `YarnFlinkResourceManager` is started with a fixed number of containers it always tries to acquire. If a container should die, then it will request a new one.

In case of a failure all slots should be freed and then they should be subject to rescheduling the new tasks. Thus, it is not necessarily the case that 12 new slots will be used unless the old slots are no longer available (failure of a TM). Therefore, it sounds like a bug what you are describing. Could you share the logs with us?

Cheers,
Till

On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <[hidden email]> wrote:
Hi guys,
    I was running a Flink job (12 parallelism) on an EMR cluster with 48 YARN slots. When the job starts, I can see from Flink UI that the job took 12 slots, and 36 slots were left available.

    I would expect that when the job fails, it would restart from checkpointing by taking another 12 slots and freeing the original 12 slots. Well, I observed that the job took new slots but never free original slots. The Flink job ended up killed by YARN because there's no available slots anymore.

     Here's the command I ran Flink job:

     ```
     flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
     ```

     Does anyone know what's going wrong?

Thanks,
Bowen



Loading...