number of files in checkpoint directory grows endlessly

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

number of files in checkpoint directory grows endlessly

Bernd.Winterstein
I have a flink job running with the following settings:
  • CheckpointingMode.EXACTLY_ONCE
  • RocksDB backend (Modified with TtlDB usage)
  • CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
  • 60 sec interval
  • Asnyc snapshots
  • Incremental checkpoints
  • Queryable State enabled
 
After a few days I have ~5000 files in the checkpoints shared directory. It grows by ~1000 files/day.
 
Now I have the following questions:
  1. What kind of information is stored in the shared subdirectory
  2. Is it safe to delete old files or do we have to restart the job from a savepoint
 
chk-9798:        139
shared:  4840
taskowned:      0
 
Regards
 
Bernd
 

  ________________________________  


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Kostas Kloudas
Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas
Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

> On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:
>
> Hi Bernd,
>
> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.
>
> Cheers,
> Kostas

Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein
Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

> On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:
>
> Hi Bernd,
>
> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.
>
> Cheers,
> Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

> On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:
>
> Hi
> We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
> The value of 'state.checkpoints.num-retained’ is not set explicitly.
>
> The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.
>
>
> -----Ursprüngliche Nachricht-----
> Von: Andrey Zagrebin [mailto:[hidden email]]
> Gesendet: Donnerstag, 29. November 2018 10:39
> An: Kostas Kloudas
> Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan Ewen
> Betreff: Re: number of files in checkpoint directory grows endlessly
>
> Hi Bernd,
>
> Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.
>
> Which version of Flink do you use?
> Can you check Job Master logs whether you see there warning like this:
> `Fail to subsume the old checkpoint`?
>
> Best,
> Andrey
>
>> On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Bernd,
>>
>> I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.
>>
>> Cheers,
>> Kostas
>
> ________________________________
>
>
> Landesbank Hessen-Thueringen Girozentrale
> Anstalt des oeffentlichen Rechts
> Sitz: Frankfurt am Main / Erfurt
> Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181
>
> Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.
>
> Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.
>
> Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.
>
> The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
> permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?

Best,
Andrey

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

[2] https://github.com/facebook/rocksdb/wiki/Compaction

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey



On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

[2] https://github.com/facebook/rocksdb/wiki/Compaction



On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

 

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Yun Tang
Hi Bernd,

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead of db.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey



On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

[2] https://github.com/facebook/rocksdb/wiki/Compaction



On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

 

Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 

 

 

     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(

                StateDescriptor<?, S> stateDesc,

                TypeSerializer<N> namespaceSerializer,

                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {

 

          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =

                kvStateInformation.get(stateDesc.getName());

 

          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;

          if (stateInfo != null) {

 

                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());

 

                Preconditions.checkState(

                     restoredMetaInfoSnapshot != null,

                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +

                          " but its corresponding restored snapshot cannot be found.");

 

                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(

                     restoredMetaInfoSnapshot,

                     namespaceSerializer,

                     stateDesc,

                     snapshotTransformer);

 

                stateInfo.f1 = newMetaInfo;

          } else {

                String stateName = stateDesc.getName();

 

                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

                     stateDesc.getType(),

                     stateName,

                     namespaceSerializer,

                     stateDesc.getSerializer(),

                     snapshotTransformer);

 

                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);

 

                stateInfo = Tuple2.of(columnFamily, newMetaInfo);

                kvStateInformation.put(stateDesc.getName(), stateInfo);

          }

 

          return Tuple2.of(stateInfo.f0, newMetaInfo);

     }

 

Von: Yun Tang [mailto:[hidden email]]
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead of db.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

 

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

 

Best

Yun Tang


From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly

 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

[2] https://github.com/facebook/rocksdb/wiki/Compaction

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

 

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Hi Bernd,

Thanks for sharing the code.

Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.

Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?

Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?

You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?

Could you check whether those ‘1121’ files identical? look inside or post one of them here?

Best,
Andrey

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);
 
                stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                kvStateInformation.put(stateDesc.getName(), stateInfo);
          }
 
          return Tuple2.of(stateInfo.f0, newMetaInfo);
     }
 
Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?
 
In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.
 
Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 
Sorry for the late answer. I haven’t been in the office.
 
The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)
 
checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/
 
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799
 
The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:
 
 
    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);
        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        rocksDBStateBackend.setOptions(new StpRocksDbOptions());
        executionEnvironment.setStateBackend(rocksDBStateBackend);
    }
 
    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());
        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getConfig().setUseSnapshotCompression(true);
    }
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?
 
Best,
Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:
 
Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.
 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:
 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein

Hi Andrey

We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.

 

It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.

 

As for the chk-* directories, there is only one directory left.

 

I attached the code for the options and the KeyedStateBackend and also one of the shared files.

 

Many thanks for your help.

 

 

Bernd

 

cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3

 

¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ

        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands

                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦

 

 

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

Could you try the same job without TtlDb, with original RocksDb state backend?

Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.

 

Do you set some more specific db/column options in this option factory StpRocksDbOptions?

Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?

 

Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?

 

You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?

 

Could you check whether those ‘1121’ files identical? look inside or post one of them here?

 

Best,

Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:

 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 

 

 

     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(

                StateDescriptor<?, S> stateDesc,

                TypeSerializer<N> namespaceSerializer,

                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {

 

          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =

                kvStateInformation.get(stateDesc.getName());

 

          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;

          if (stateInfo != null) {

 

                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());

 

                Preconditions.checkState(

                     restoredMetaInfoSnapshot != null,

                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +

                          " but its corresponding restored snapshot cannot be found.");

 

                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(

                     restoredMetaInfoSnapshot,

                     namespaceSerializer,

                     stateDesc,

                     snapshotTransformer);

 

                stateInfo.f1 = newMetaInfo;

          } else {

                String stateName = stateDesc.getName();

 

                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

                     stateDesc.getType(),

                     stateName,

                     namespaceSerializer,

                     stateDesc.getSerializer(),

                     snapshotTransformer);

 

                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);

 

                stateInfo = Tuple2.of(columnFamily, newMetaInfo);

                kvStateInformation.put(stateDesc.getName(), stateInfo);

          }

 

          return Tuple2.of(stateInfo.f0, newMetaInfo);

     }

 

Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

 

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

 

Best

Yun Tang


From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly

 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 


StpRocksDbOptions.java (1K) Download Attachment
RocksDBKeyedStateBackend.java (143K) Download Attachment
shared_file.bin.txt (1K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Hi Bernd,

Thanks for sharing the code.

I understand your TTL requirement. It definitely makes sense for your application. 
My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.

Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.

Best,
Andrey

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#io-tmp-dirs

On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:

Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.
 
It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.
 
As for the chk-* directories, there is only one directory left.
 
I attached the code for the options and the KeyedStateBackend and also one of the shared files.
 
Many thanks for your help.
 
 
Bernd
 
cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3
 
¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ
        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands
                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.
 
Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?
 
Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?
 
You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?
 
Could you check whether those ‘1121’ files identical? look inside or post one of them here?
 
Best,
Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:
 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);
 
                stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                kvStateInformation.put(stateDesc.getName(), stateInfo);
          }
 
          return Tuple2.of(stateInfo.f0, newMetaInfo);
     }
 
Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?
 
In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.
 
Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 
Sorry for the late answer. I haven’t been in the office.
 
The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)
 
checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/
 
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799
 
The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:
 
 
    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);
        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        rocksDBStateBackend.setOptions(new StpRocksDbOptions());
        executionEnvironment.setStateBackend(rocksDBStateBackend);
    }
 
    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());
        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getConfig().setUseSnapshotCompression(true);
    }
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?
 
Best,
Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:
 
Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.
 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:
 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
 
<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Yun Tang
Hi Bernd

RocksDB would not delete expired entries until compaction triggered. I saw your code set the 'readOnly' as false already, and from your description, the files in shared directory continue to increase each day. I think the mechanism of TTL for rocksDB should work fine.

If you want to ensure whether this configured well, please visit the working directory of RocksDB on each task manager, which would be in 'java.io.tmpdir' folder. For YARN environment, this folder could located in conatiner's folder named 'flink-io-[uuid]' by default . You could find the 'LOG' file under 'db' folder.  Open the 'LOG' file to search below column family configuration whether existed.
Options.compaction_filter_factory: TtlCompactionFilterFactory

If TTL could be verified already configured well through above steps, I think Andrey's suggestion to narrow down this problem could help to find other possible causes.

Best
Yun Tang


From: Andrey Zagrebin <[hidden email]>
Sent: Thursday, December 6, 2018 19:07
To: [hidden email]
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Subject: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,

Thanks for sharing the code.

I understand your TTL requirement. It definitely makes sense for your application. 
My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.

Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.

Best,
Andrey

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#io-tmp-dirs

On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:

Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.
 
It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.
 
As for the chk-* directories, there is only one directory left.
 
I attached the code for the options and the KeyedStateBackend and also one of the shared files.
 
Many thanks for your help.
 
 
Bernd
 
cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3
 
¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ
        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands
                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.
 
Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?
 
Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?
 
You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?
 
Could you check whether those ‘1121’ files identical? look inside or post one of them here?
 
Best,
Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:
 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);
 
                stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                kvStateInformation.put(stateDesc.getName(), stateInfo);
          }
 
          return Tuple2.of(stateInfo.f0, newMetaInfo);
     }
 
Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?
 
In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.
 
Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 
Sorry for the late answer. I haven’t been in the office.
 
The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)
 
checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/
 
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799
 
The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:
 
 
    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);
        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        rocksDBStateBackend.setOptions(new StpRocksDbOptions());
        executionEnvironment.setStateBackend(rocksDBStateBackend);
    }
 
    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());
        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getConfig().setUseSnapshotCompression(true);
    }
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?
 
Best,
Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:
 
Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.
 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:
 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints

On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey

On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
 
<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein
In reply to this post by Andrey Zagrebin

Hi Andrey

I have the same pattern of files in the taskmanager tmp directory. There are matching .sst files with the same file size and time.

 

I attached the rocksdb LOG file.

 

/var/lib/taskmanager/temp1/flink-io-dadc410c-ba0e-40b0-9f0d-e91758eadf15/job_a73a132912c8efaaab4a8e6331bdcf47_op_KeyedProcessOperator_1390d90f0116bdd25ec5582aa781368c__5_6__uuid_bda6b235-18e3-4c5b-9564-120fedfa6462/db:

 

total 141048

-rw-r--r--. 1 flink flink        0 Dec  5 08:58 000003.log

-rw-r--r--. 1 flink flink     1121 Dec  5 09:04 000019.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:09 000026.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:14 000033.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:19 000041.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:24 000048.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:29 000055.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:34 000062.sst

-rw-r--r--. 1 flink flink     1121 Dec  5 09:39 000070.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 11:54 002353.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 11:59 002361.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:04 002368.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:09 002375.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:14 002382.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:19 002390.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:24 002397.sst

-rw-r--r--. 1 flink flink     1121 Dec  6 12:29 002404.sst

-rw-r--r--. 1 flink flink 97152399 Dec  6 12:31 002407.sst

-rw-r--r--. 1 flink flink    70640 Dec  6 12:32 002408.sst

-rw-r--r--. 1 flink flink    73598 Dec  6 12:33 002409.sst

-rw-r--r--. 1 flink flink       16 Dec  5 08:58 CURRENT

-rw-r--r--. 1 flink flink       37 Dec  5 08:58 IDENTITY

-rw-r--r--. 1 flink flink        0 Dec  5 08:58 LOCK

-rw-r--r--. 1 flink flink 41480256 Dec  6 12:33 LOG

-rw-r--r--. 1 flink flink   424380 Dec  6 12:33 MANIFEST-000006

-rw-r--r--. 1 flink flink     9034 Dec  5 08:58 OPTIONS-000010

-rw-r--r--. 1 flink flink    11318 Dec  5 08:58 OPTIONS-000012

 

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

I understand your TTL requirement. It definitely makes sense for your application. 

My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.

 

Could you also investigate temporary local directories in task executors?

By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].

Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.

 

Best,

Andrey

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#io-tmp-dirs



On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:

 

Hi Andrey

We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.

 

It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.

 

As for the chk-* directories, there is only one directory left.

 

I attached the code for the options and the KeyedStateBackend and also one of the shared files.

 

Many thanks for your help.

 

 

Bernd

 

cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3

 

¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ

        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands

                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

Could you try the same job without TtlDb, with original RocksDb state backend?

Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.

 

Do you set some more specific db/column options in this option factory StpRocksDbOptions?

Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?

 

Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?

 

You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?

 

Could you check whether those ‘1121’ files identical? look inside or post one of them here?

 

Best,

Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:

 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 

 

 

     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(

                StateDescriptor<?, S> stateDesc,

                TypeSerializer<N> namespaceSerializer,

                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {

 

          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =

                kvStateInformation.get(stateDesc.getName());

 

          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;

          if (stateInfo != null) {

 

                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());

 

                Preconditions.checkState(

                     restoredMetaInfoSnapshot != null,

                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +

                          " but its corresponding restored snapshot cannot be found.");

 

                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(

                     restoredMetaInfoSnapshot,

                     namespaceSerializer,

                     stateDesc,

                     snapshotTransformer);

 

                stateInfo.f1 = newMetaInfo;

          } else {

                String stateName = stateDesc.getName();

 

                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

                     stateDesc.getType(),

                     stateName,

                     namespaceSerializer,

                     stateDesc.getSerializer(),

                     snapshotTransformer);

 

                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);

 

                stateInfo = Tuple2.of(columnFamily, newMetaInfo);

                kvStateInformation.put(stateDesc.getName(), stateInfo);

          }

 

          return Tuple2.of(stateInfo.f0, newMetaInfo);

     }

 

Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

 

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

 

Best

Yun Tang


From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly

 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

 


LOG.zip (4M) Download Attachment
Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein
In reply to this post by Andrey Zagrebin

Seems that some file deletion is disabled by default. There are some log entries in the file

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

I understand your TTL requirement. It definitely makes sense for your application. 

My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.

 

Could you also investigate temporary local directories in task executors?

By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].

Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.

 

Best,

Andrey

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#io-tmp-dirs



On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:

 

Hi Andrey

We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.

 

It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.

 

As for the chk-* directories, there is only one directory left.

 

I attached the code for the options and the KeyedStateBackend and also one of the shared files.

 

Many thanks for your help.

 

 

Bernd

 

cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3

 

¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ

        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands

                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

Could you try the same job without TtlDb, with original RocksDb state backend?

Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.

 

Do you set some more specific db/column options in this option factory StpRocksDbOptions?

Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?

 

Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?

 

You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?

 

Could you check whether those ‘1121’ files identical? look inside or post one of them here?

 

Best,

Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:

 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 

 

 

     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(

                StateDescriptor<?, S> stateDesc,

                TypeSerializer<N> namespaceSerializer,

                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {

 

          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =

                kvStateInformation.get(stateDesc.getName());

 

          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;

          if (stateInfo != null) {

 

                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());

 

                Preconditions.checkState(

                     restoredMetaInfoSnapshot != null,

                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +

                          " but its corresponding restored snapshot cannot be found.");

 

                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(

                     restoredMetaInfoSnapshot,

                     namespaceSerializer,

                     stateDesc,

                     snapshotTransformer);

 

                stateInfo.f1 = newMetaInfo;

          } else {

                String stateName = stateDesc.getName();

 

                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

                     stateDesc.getType(),

                     stateName,

                     namespaceSerializer,

                     stateDesc.getSerializer(),

                     snapshotTransformer);

 

                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);

 

                stateInfo = Tuple2.of(columnFamily, newMetaInfo);

                kvStateInformation.put(stateDesc.getName(), stateInfo);

          }

 

          return Tuple2.of(stateInfo.f0, newMetaInfo);

     }

 

Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

 

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

 

Best

Yun Tang


From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly

 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

 

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Hi Bernd,

Does this directory contains approximately 300 files on task executor?

The 'transactionState' seems to be ok and compacted. It takes most of data size, up to 100Mb.

Do you use any Flink timers? They are also kept in RocksDb and take very little, just about 300Kb.

It might need more investigation but my idea is the following.
The column of Flink user timers has a lot of small files with just 1-2 new records, checkpointed all the time.  Most of the small file contents is just meta info. It seems to include both creation and deletion entries. As timers are always new entries, the sst files either do not overlap or too small. As result they are not merged upon compaction but just forwarded to the last level [1]. Setting dynamic compaction in SPINNING_DISK_OPTIMIZED_HIGH_MEM makes its target size what it is at any time, basically disabling compaction for it. The files can stay very long. I cannot check for all 300 files but it seems for 19/26 file removal has not happened as it does for ‘transactionState'. The RocksDb suggest to use some other TTL concept to force compaction [2]. The compaction can be also triggered manually from time to time but this is not part of standard Flink distribution. The TtlDb cleans up expired entries also only during compaction.

Best,
Andrey

[2] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#ttl

On 6 Dec 2018, at 12:53, <[hidden email]> <[hidden email]> wrote:

Seems that some file deletion is disabled by default. There are some log entries in the file
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
I understand your TTL requirement. It definitely makes sense for your application. 
My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.
 
Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.
 
Best,
Andrey
 


On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:
 
Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.
 
It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.
 
As for the chk-* directories, there is only one directory left.
 
I attached the code for the options and the KeyedStateBackend and also one of the shared files.
 
Many thanks for your help.
 
 
Bernd
 
cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3
 
¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ
        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands
                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.
 
Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?
 
Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?
 
You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?
 
Could you check whether those ‘1121’ files identical? look inside or post one of them here?
 
Best,
Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:
 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);
 
                stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                kvStateInformation.put(stateDesc.getName(), stateInfo);
          }
 
          return Tuple2.of(stateInfo.f0, newMetaInfo);
     }
 
Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?
 
In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.
 
Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 
Sorry for the late answer. I haven’t been in the office.
 
The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)
 
checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/
 
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799
 
The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:
 
 
    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);
        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        rocksDBStateBackend.setOptions(new StpRocksDbOptions());
        executionEnvironment.setStateBackend(rocksDBStateBackend);
    }
 
    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());
        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getConfig().setUseSnapshotCompression(true);
    }
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?
 
Best,
Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:
 
Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.
 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:
 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
 
<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Could you also have a look into other task executors and RocksDb LOG files? 
How many files are hard linked there after the last checkpoint?
Does the total counter match number of files in shared directory?
It would confirm that the problem is with the timer state compaction and there is no file handle leaks.

Could you also share the code which uses Flink timers if any to understand this behaviour of 1-2 timers per checkpoint?

On 6 Dec 2018, at 18:47, Andrey Zagrebin <[hidden email]> wrote:

Hi Bernd,

Does this directory contains approximately 300 files on task executor?

The 'transactionState' seems to be ok and compacted. It takes most of data size, up to 100Mb.

Do you use any Flink timers? They are also kept in RocksDb and take very little, just about 300Kb.

It might need more investigation but my idea is the following.
The column of Flink user timers has a lot of small files with just 1-2 new records, checkpointed all the time.  Most of the small file contents is just meta info. It seems to include both creation and deletion entries. As timers are always new entries, the sst files either do not overlap or too small. As result they are not merged upon compaction but just forwarded to the last level [1]. Setting dynamic compaction in SPINNING_DISK_OPTIMIZED_HIGH_MEM makes its target size what it is at any time, basically disabling compaction for it. The files can stay very long. I cannot check for all 300 files but it seems for 19/26 file removal has not happened as it does for ‘transactionState'. The RocksDb suggest to use some other TTL concept to force compaction [2]. The compaction can be also triggered manually from time to time but this is not part of standard Flink distribution. The TtlDb cleans up expired entries also only during compaction.

Best,
Andrey

[2] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#ttl

On 6 Dec 2018, at 12:53, <[hidden email]> <[hidden email]> wrote:

Seems that some file deletion is disabled by default. There are some log entries in the file
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
I understand your TTL requirement. It definitely makes sense for your application. 
My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.
 
Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.
 
Best,
Andrey
 


On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:
 
Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.
 
It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.
 
As for the chk-* directories, there is only one directory left.
 
I attached the code for the options and the KeyedStateBackend and also one of the shared files.
 
Many thanks for your help.
 
 
Bernd
 
cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3
 
¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ
        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands
                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.
 
Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?
 
Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?
 
You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?
 
Could you check whether those ‘1121’ files identical? look inside or post one of them here?
 
Best,
Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:
 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);
 
                stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                kvStateInformation.put(stateDesc.getName(), stateInfo);
          }
 
          return Tuple2.of(stateInfo.f0, newMetaInfo);
     }
 
Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?
 
In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.
 
Best
Yun Tang

From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly
 
Sorry for the late answer. I haven’t been in the office.
 
The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)
 
checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/
 
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0
-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799
 
The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:
 
 
    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {
        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);
        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());
        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        rocksDBStateBackend.setOptions(new StpRocksDbOptions());
        executionEnvironment.setStateBackend(rocksDBStateBackend);
    }
 
    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());
        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getConfig().setUseSnapshotCompression(true);
    }
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Could you share the logs to check possible failures to subsume or remove previous checkpoints?
What is the sizes of the files? It can help to understand how compaction goes.
Could you also provide more details how you setup TtlDb with Flink?
 
Best,
Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:
 
Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.
 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:
 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints


On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey


On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas

________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.
 
<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>


Reply | Threaded
Open this post in threaded view
|

AW: number of files in checkpoint directory grows endlessly

Bernd.Winterstein
In reply to this post by Andrey Zagrebin

Hi Andrey,

 

I checked our code again. We are indeed using timers for dynamic routing updates. It gets triggered every five minutes!

This must be the reason for the five minutes pattern in the remaining .sst files.

 

Do I understand it correctly, that the files remain, because they are too small for compaction

 

A dynamic ttl column-family option has been introduced to solve this problem. Files (and, in turn, data) older than TTL will be scheduled for compaction when there is no other background work. This will make the data go through the regular compaction process and get rid of old unwanted data. This also has the (good) side-effect of all the data in the non-bottommost level being newer than ttl, and all data in the bottommost level older than ttl. Note that it could lead to more writes as RocksDB would schedule more compactions.

 

How do I enable this “ttl column-family” option? I don’t find any hint´s.

 

Regards,

 

Bernd

 

Von: Andrey Zagrebin [mailto:[hidden email]]
Gesendet: Donnerstag, 6. Dezember 2018 18:47
An: Winterstein, Bernd
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Does this directory contains approximately 300 files on task executor?

 

The 'transactionState' seems to be ok and compacted. It takes most of data size, up to 100Mb.

 

Do you use any Flink timers? They are also kept in RocksDb and take very little, just about 300Kb.

 

It might need more investigation but my idea is the following.

The column of Flink user timers has a lot of small files with just 1-2 new records, checkpointed all the time.  Most of the small file contents is just meta info. It seems to include both creation and deletion entries. As timers are always new entries, the sst files either do not overlap or too small. As result they are not merged upon compaction but just forwarded to the last level [1]. Setting dynamic compaction in SPINNING_DISK_OPTIMIZED_HIGH_MEM makes its target size what it is at any time, basically disabling compaction for it. The files can stay very long. I cannot check for all 300 files but it seems for 19/26 file removal has not happened as it does for ‘transactionState'. The RocksDb suggest to use some other TTL concept to force compaction [2]. The compaction can be also triggered manually from time to time but this is not part of standard Flink distribution. The TtlDb cleans up expired entries also only during compaction.

 

Best,

Andrey

 

[2] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#ttl



On 6 Dec 2018, at 12:53, <[hidden email]> <[hidden email]> wrote:

 

Seems that some file deletion is disabled by default. There are some log entries in the file

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

I understand your TTL requirement. It definitely makes sense for your application. 

My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.

 

Could you also investigate temporary local directories in task executors?

By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].

Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.

 

Best,

Andrey

 




On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:

 

Hi Andrey

We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.

 

It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.

 

As for the chk-* directories, there is only one directory left.

 

I attached the code for the options and the KeyedStateBackend and also one of the shared files.

 

Many thanks for your help.

 

 

Bernd

 

cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3

 

¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ

        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands

                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

Thanks for sharing the code.

 

Could you try the same job without TtlDb, with original RocksDb state backend?

Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.

 

Do you set some more specific db/column options in this option factory StpRocksDbOptions?

Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?

 

Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?

 

You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?

 

Could you check whether those ‘1121’ files identical? look inside or post one of them here?

 

Best,

Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:

 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 

 

 

     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(

                StateDescriptor<?, S> stateDesc,

                TypeSerializer<N> namespaceSerializer,

                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {

 

          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =

                kvStateInformation.get(stateDesc.getName());

 

          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;

          if (stateInfo != null) {

 

                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());

 

                Preconditions.checkState(

                     restoredMetaInfoSnapshot != null,

                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +

                          " but its corresponding restored snapshot cannot be found.");

 

                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(

                     restoredMetaInfoSnapshot,

                     namespaceSerializer,

                     stateDesc,

                     snapshotTransformer);

 

                stateInfo.f1 = newMetaInfo;

          } else {

                String stateName = stateDesc.getName();

 

                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(

                     stateDesc.getType(),

                     stateName,

                     namespaceSerializer,

                     stateDesc.getSerializer(),

                     snapshotTransformer);

 

                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);

 

                stateInfo = Tuple2.of(columnFamily, newMetaInfo);

                kvStateInformation.put(stateDesc.getName(), stateInfo);

          }

 

          return Tuple2.of(stateInfo.f0, newMetaInfo);

     }

 

Von: Yun Tang [[hidden email]] 
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Hi Bernd,

 

RocksDBStateBackend would not use default column family, but put state's entries into its newly-created column family by default. Would you please check whether having used db.createColumnFamilyWithTtl method instead ofdb.createColumnFamily within RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

 

In my opinion, the default method of db.createColumnFamily would set ttl as 0, which means infinity, for this column family. You could view RocksDB's implementation here.

 

Best

Yun Tang


From: [hidden email] <[hidden email]>
Sent: Tuesday, December 4, 2018 17:40
To: [hidden email]
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Subject: AW: number of files in checkpoint directory grows endlessly

 

Sorry for the late answer. I haven’t been in the office.

 

The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)

 

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

 

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice    308383 Dec  4 10:23 dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice    101035 Dec  4 10:23 e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice    308270 Dec  4 10:23 23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice    102444 Dec  4 10:23 f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice    308346 Dec  4 10:23 bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice     96035 Dec  4 10:23 a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice    308387 Dec  4 10:23 7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice     99209 Dec  4 10:23 5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice    308451 Dec  4 10:23 69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice    105638 Dec  4 10:23 16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice     11318 Dec  4 10:23 eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice    308513 Dec  4 10:23 47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice    109918 Dec  4 10:23 aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice     97551 Dec  4 10:22 5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice    104198 Dec  4 10:22 cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice    101466 Dec  4 10:22 ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice    108561 Dec  4 10:22 5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice     98649 Dec  4 10:22 d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice    102071 Dec  4 10:22 f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice      1121 Dec  4 10:21 acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:16 aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 ee15f1e0-d23c-4add-86b4-e4ab51bb2a20

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 f440b5cf-8f62-4532-a886-a2cedc9a043e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 de423c46-4288-464b-97cb-6f7764b88dfd

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 273a15cb-8c9f-4412-b5d2-68397ba461c9

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 bb38b011-070d-4c21-b04a-4e923f85de86

-rw-r--r--. 1 flink ice      1121 Dec  4 10:11 969abc07-d313-4d79-8119-6e1f3886be48

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 eb0b2591-653c-47bd-a6b2-9f6634ff4f0a

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 20b7e49a-ace5-4ef7-987f-0d328f47c56f

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 a25c2bd9-7fe9-4558-b9dd-30b525a0b435

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 dcd0852f-58dc-467e-93db-5700cd4f606e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 400e5038-2913-4aea-932d-92f508bd38f7

-rw-r--r--. 1 flink ice      1121 Dec  4 10:06 10ce727b-9389-4911-b0d4-1b342dd3232c

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 daec0dcb-384a-4d86-a423-7e2b0482b70e

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 c787d58a-4bd5-4d9a-a4d8-47d9618552ff

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b2c1383a-8452-4ec6-9064-a5e8f56e6f21

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 65c0c908-b604-4ac9-a72f-4bb87676df11

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 42d59072-95ec-42d5-81ff-b9447cb39fd0

-rw-r--r--. 1 flink ice      1121 Dec  4 10:01 b40c2577-e604-482d-86a1-ddb785e3b799

 

The ttldb stream backend is mostly a copy of the current flink-statebackend-rocksb which uses the TtlDB instead of the RockDB class with the following configuration:

 

 

    protected void buildDefaultStateBackend(StreamExecutionEnvironment executionEnvironment) throws IOException {

        FsStateBackend fsStateBackend = new FsStateBackend(configuration.getCheckpointDirectory(), true);

        RocksDBStateBackend rocksDBStateBackend = new de.helaba.rtts.ice.rocksdb.ttl.RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE, configuration.getStateTimeToLive());

        rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

        rocksDBStateBackend.setOptions(new StpRocksDbOptions());

        executionEnvironment.setStateBackend(rocksDBStateBackend);

    }

 

    protected void configureCheckpointing(StreamExecutionEnvironment executionEnvironment) {

        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();

        checkpointConfig.setCheckpointInterval(configuration.getCheckpointInterval());

        checkpointConfig.setMinPauseBetweenCheckpoints(configuration.getCheckpointMinPause());

        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        executionEnvironment.getConfig().setUseSnapshotCompression(true);

    }

 

 

 

Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 29. November 2018 15:38
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

 

Could you share the logs to check possible failures to subsume or remove previous checkpoints?

What is the sizes of the files? It can help to understand how compaction goes.

Could you also provide more details how you setup TtlDb with Flink?

 

Best,

Andrey

 

On 29 Nov 2018, at 11:34, Andrey Zagrebin <[hidden email]> wrote:

 

Compaction merges SST files in background using native threads. While merging it filters out removed and expired data. In general, the idea is that there are enough resources for compaction to keep up with the DB update rate and reduce storage. It can be quite IO intensive. Compaction has a lot of tuning knobs and statistics to monitor the process [1] which are usually out of the scope of Flink depending on state access pattern of the application. You can create and set RocksDBStateBackend for you application in Flink and configure it with custom RocksDb/column specific options.

 

 

On 29 Nov 2018, at 11:20, <[hidden email]> <[hidden email]> wrote:

 

We use TtlDB because the state contents should expire automatically after 24 hours. Therefore we only changed the state backend to use TtlDb instead of RocksDB with a fixed retention time.

We have a slow IO because we only have SAN volumes available. Can you further clarify the problem with slow compaction.

Regards,

Bernd


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 11:01
An: Winterstein, Bernd
Cc: Kostas Kloudas; user; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly

If you use incremental checkpoints, state backend stores raw RocksDB SST files which represent all state data. Each checkpoint adds SST files with new updates which are not present in previous checkpoint, basically their difference.

One of the following could be happening:
- old keys are not explicitly deleted or expire (depending on how TtlDb is used)
- compaction is too slow to drop older SST files for the latest checkpoint so that they can be deleted with the previous checkpoints



On 29 Nov 2018, at 10:48, <[hidden email]> <[hidden email]> wrote:

Hi
We use Flink 1..6.2. As for the checkpoint directory there is only one chk-xxx directory. Therefore if would expect only one checkpoint remains.
The value of 'state.checkpoints.num-retained’ is not set explicitly.

The problem is not the number of checkpoints but the number of files in the "shared" directory next to the chk-xxx directory.


-----Ursprüngliche Nachricht-----
Von: Andrey Zagrebin [[hidden email]]
Gesendet: Donnerstag, 29. November 2018 10:39
An: Kostas Kloudas
Cc: Winterstein, Bernd; user; Stefan Richter; Till Rohrmann; Stephan
Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

Did you change 'state.checkpoints.num-retained’ in flink-conf.yaml? By default, only one checkpoint should be retained.

Which version of Flink do you use?
Can you check Job Master logs whether you see there warning like this:
`Fail to subsume the old checkpoint`?

Best,
Andrey



On 29 Nov 2018, at 10:18, Kostas Kloudas <[hidden email]> wrote:

Hi Bernd,

I think the Till, Stefan or Stephan (cc'ed) are the best to answer your question.

Cheers,
Kostas


________________________________


Landesbank Hessen-Thueringen Girozentrale Anstalt des oeffentlichen
Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA
102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the
recipient indicated. Taking notice of this message or disclosure by third parties is not permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.


________________________________


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of information. We do not accept legally binding declarations (orders, etc.) by this means of communication.

The contents of this message is confidential and intended only for the recipient indicated. Taking notice of this message or disclosure by third parties is not
permitted. In the event that this message is not intended for you, please contact us via E-mail or phone.

 

<StpRocksDbOptions.java><RocksDBKeyedStateBackend.java><shared_file.bin.txt>

 

Reply | Threaded
Open this post in threaded view
|

Re: number of files in checkpoint directory grows endlessly

Andrey Zagrebin
Hi Bernd,

I do not see this option exposed in RocksDb Java API over JNI at the moment [1][2], only in native code [3].
I would try calling periodically db.compactRange in RocksDBKeyedStateBackend.

Best,
Andrey

[3] https://github.com/facebook/rocksdb/blob/master/include/rocksdb/advanced_options.h#L64

On 10 Dec 2018, at 09:28, <[hidden email]> <[hidden email]> wrote:

Hi Andrey,
 
I checked our code again. We are indeed using timers for dynamic routing updates. It gets triggered every five minutes!
This must be the reason for the five minutes pattern in the remaining .sst files.
 
Do I understand it correctly, that the files remain, because they are too small for compaction
 
A dynamic ttl column-family option has been introduced to solve this problem. Files (and, in turn, data) older than TTL will be scheduled for compaction when there is no other background work. This will make the data go through the regular compaction process and get rid of old unwanted data. This also has the (good) side-effect of all the data in the non-bottommost level being newer than ttl, and all data in the bottommost level older than ttl. Note that it could lead to more writes as RocksDB would schedule more compactions.
 
How do I enable this “ttl column-family” option? I don’t find any hint´s.
 
Regards,
 
Bernd
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 6. Dezember 2018 18:47
An: Winterstein, Bernd
Cc: [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Does this directory contains approximately 300 files on task executor?
 
The 'transactionState' seems to be ok and compacted. It takes most of data size, up to 100Mb.
 
Do you use any Flink timers? They are also kept in RocksDb and take very little, just about 300Kb.
 
It might need more investigation but my idea is the following.
The column of Flink user timers has a lot of small files with just 1-2 new records, checkpointed all the time.  Most of the small file contents is just meta info. It seems to include both creation and deletion entries. As timers are always new entries, the sst files either do not overlap or too small. As result they are not merged upon compaction but just forwarded to the last level [1]. Setting dynamic compaction in SPINNING_DISK_OPTIMIZED_HIGH_MEM makes its target size what it is at any time, basically disabling compaction for it. The files can stay very long. I cannot check for all 300 files but it seems for 19/26 file removal has not happened as it does for ‘transactionState'. The RocksDb suggest to use some other TTL concept to force compaction [2]. The compaction can be also triggered manually from time to time but this is not part of standard Flink distribution. The TtlDb cleans up expired entries also only during compaction.
 
Best,
Andrey
 


On 6 Dec 2018, at 12:53, <[hidden email]> <[hidden email]> wrote:
 
Seems that some file deletion is disabled by default. There are some log entries in the file
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Donnerstag, 6. Dezember 2018 12:07
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; Stephan Ewen
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
I understand your TTL requirement. It definitely makes sense for your application. 
My recommendation is still to try running job with original backend without TtlDb modification to narrow down the problem and understand where these small files come from. They might look like some meta information which seems to accumulate over time.
 
Could you also investigate temporary local directories in task executors?
By default, they are usually in System.getProperty("java.io.tmpdir") (e.g. /tmp in Linux) or there is also a config option for them [1].
Do you see there explosion of similar files? They can be named differently. The interesting would be whether the smaller files end with ‘.sst’ or not.
 
Best,
Andrey
 



On 6 Dec 2018, at 09:16, <[hidden email]> <[hidden email]> wrote:
 
Hi Andrey
We need the TTL feature, because we store about 10 million state entries per 24 hours. Each entry has to expire after 24 hours. We couldn’t do the expiry with timers due to bad checkpoint performance, so I switched to TtlDB.
 
It seems that per minute files are only available for the last incremental period. After this time the files have five minute distance.
 
As for the chk-* directories, there is only one directory left.
 
I attached the code for the options and the KeyedStateBackend and also one of the shared files.
 
Many thanks for your help.
 
 
Bernd
 
cat /appdata/rtts/ice/shared/jobmanager/checkpoints/stp/a73a132912c8efaaab4a8e6331bdcf47/shared/6d64a7cc-8cdf-4f51-aa2f-bb7c8e7d4ce3
 
¦>¦¦}d¦¦/20181205-HELADEF1TSK-TX-9ffXfg1DZCHEe2JGqpC6dDJ
        ¦¦$rocksdb.block.based.table.index.type*rocksdb.block.based.table.prefix.filtering0-rocksdb.block.based.table.whole.key.filtering1rocksdb.column.family.id#rocksdb.column.family.name_timer_state/processing_user-timersrocksdb.comparatorleveldb.BytewiseComparatorrocksdb.compressionSnappyrocksdb.creation.time¦¦¦¦rocksdb.data.sizeQrocksdb.deleted.keysrocksdb.filter.policyrocksdb.BuiltinBloomFilterrocksdb.filter.sizerocksdb.fixed.key.lengthrocksdb.format.versionrocksdb.index.sizeocksdb.merge.operands
                                                                                                                                                                                                                                                       rocksdb.merge.operatorMerge By TTLrocksdb.num.data.blocksrocksdb.num.entriesrocksdb.prefix.extractor.namenullptrocksdb.property.collectors[]rocksdb.raw.key.sizeArocksdb.raw.value.size+Y¦¦¦1Nc{¦¦¦¦*Oj¦¦¦¦¦V!filter.rocksdb.BuiltinBloomFilterQrocksdb.propertiesh¦&¦¦2?    }¦¦¦¦¦¦¦L¦¦¦JW¦¦¦$uG¦
 
 
 
Von: Andrey Zagrebin [[hidden email]] 
Gesendet: Mittwoch, 5. Dezember 2018 15:25
An: Winterstein, Bernd
Cc: Yun Tang; Kostas Kloudas; user; Stefan Richter; Till Rohrmann; [hidden email]
Betreff: Re: number of files in checkpoint directory grows endlessly
 
Hi Bernd,
 
Thanks for sharing the code.
 
Could you try the same job without TtlDb, with original RocksDb state backend?
Even if data does not expire every day, the overall size will grow but the number of files should not because of compaction.
 
Do you set some more specific db/column options in this option factory StpRocksDbOptions?
Could you share the source code of it and modified RocksDBKeyedStateBackend, especially createColumnFamilyWithTtl?
 
Do you always have only 1-2 last 'chk-*’ directories at a time or there are more not cleaned up checkpoint directories?
 
You have 1 minute checkpoint interval but the list of files does not have files for some minutes consecutively. Do the files miss for some minutes/checkpoints, like 10:10 or 10:05, or checkpoints just take long including CheckpointMinPause?
 
Could you check whether those ‘1121’ files identical? look inside or post one of them here?
 
Best,
Andrey

 

On 4 Dec 2018, at 14:10, <[hidden email]> <[hidden email]> wrote:
 

All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl

 
 
 
     private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
                StateDescriptor<?, S> stateDesc,
                TypeSerializer<N> namespaceSerializer,
                @Nullable StateSnapshotTransformer<S> snapshotTransformer) throws StateMigrationException {
 
          Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
                kvStateInformation.get(stateDesc.getName());
 
          RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
          if (stateInfo != null) {
 
                StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
 
                Preconditions.checkState(
                     restoredMetaInfoSnapshot != null,
                     "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
                          " but its corresponding restored snapshot cannot be found.");
 
                newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
                     restoredMetaInfoSnapshot,
                     namespaceSerializer,
                     stateDesc,
                     snapshotTransformer);
 
                stateInfo.f1 = newMetaInfo;
          } else {
                String stateName = stateDesc.getName();
 
                newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                     stateDesc.getType(),
                     stateName,
                     namespaceSerializer,
                     stateDesc.getSerializer(),
                     snapshotTransformer);
 
                ColumnFamilyHandle columnFamily = createColumnFamilyWithTtl(stateName);