Kafka producer throws exceptions when publishing a keyed stream
I am running into issues writing a keyed stream from sink subtasks to an output kafka topic.
The job is of the form: source -> filter -> keyby(id) -> flatmap -> sink
The exceptions are coming from the kafka producer and cause checkpointing to timeout:
FlinkKafkaException: Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time
FlinkKafkaException: Failed to send data to Kafka: Expiring 16 record(s) for mytopic-11:120000 ms has passed since batch creation
The job gets into a crashloop with the above exceptions and occasionally briefly recovers temporarily before crashlooping again. I believe the problem here is that I'm using the keys to determine the output partitions, which causes the P sink subtasks to each fan out writes to N output partitions. Ideally, each subtask would only write to a single partition.
The job has the following constraints/properties: #1: once a key has been written to an output kafka topic partition, it needs to always be written to the same kafka partition in the future #2: the sink subtask parallelism will initially equal the number of output partitions #3: I should be able to increase the parallelism in the future without violating #1 #4: the output kafka topic will never add new partitions
If parallelism == partitions, then I believe the FlinkFixedPartitioner would be a fine solution. However, I don't think that it would respect the original key->partition mapping if I later increased parallelism since it chooses the output partition using this scheme.
Is there a technique I could use here to satisfy these constraints? Possibly a tweak to the kafka producer's settings, another method for partitioning the keyed stream, or something else?