I want to do: Using Flink DataStream API, create a Kafka consumer, get all messages from the topic up to the current moment, stop consumer (the main problem is with stopping).
However, I have no ideas on how to do that. Operating on DataStream, I am able to pass as a source consumer, but how to stop it when all messages are consumed? (i.e. consumer should not idle). One of my ideas was to make a timeout function which I pass to streamenv.process. However, it did not work out as my consumer was not closed(it is described in more detail on the SO).
Is there any other solution? Or should I continue trying with TimeoutFunction? Could you please help? Thanks in advance!