I'm not sure my question was clear enough so I'll try to explain our scenario:
We are working in “event time” mode.
We want to handle ‘late data’ up to last X days (for example last 7 days)
For each incoming event:
The event is being aggregated using window function.
When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.
If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.
we are not using the default trigger.
We expect the flow above to result in fragmented data, i.e. several outputs with the same <key, windows> which aggregate different sets of events.
We encounter the following problem:
Since we have a huge number of different <key, windows>, the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory.
Here is a calculation of the required memory consumption only for the window metadata -
Metadata size for each <key, windows> is at least 64 bytes.
If we have 200,000,000 <key, windows> per day and the allowed lateness is set to 7 days:
200,000,000 * 64 * 7 = ~83GB
For the scenario above the window metadata is useless.
Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period?