Improve data type validation in Flink inputformats according to avro schemas

françois lacombe
Hi all,

Following this JIRA ticket opened last year :

I use to run Flink, currently 1.8, in my professional activity (at DCbrain, a French B2B software editor for physical fluid networks operators) and we wish to share some of our experience with community as a lot of valuable work is done here.
We began to write some custom InputFormats with particular functionalities which may be useful to anyone interested in data validation.

Prior to address a large variety of formats, we were looking forward to build them according to Avro schemas as explained in the JIRA ticket for CSV. Then we now try to implement part of our data validation strategy according to the schema provided to build the inputFormat. Avro schemas are suitable for this and pretty nice to handle.
To me, type validation = check records against defined schema to redirect bogus or unexpected ones to a dedicated output for administrative or data engineering inspection and preserve rest of the job with conform records

We do have now an abstract class extending RichInputFormat<Row> allowing to define type-validation-able inputformats with main advantages :
- Identify and log Row records involving data with different type than specified in the schema
- Preserve type safety in most part of our jobs, starting in nextRecord() method : we only send further what conforms to Avro schema
- Inspect streaming records on the fly without much processing workload

Currently we can provide type-validation capable intput formats for :
- Csv
- Json
- GeoJson
- ESRI Shape

JDBCInputFormat has been wrapped in dedicated logic involving Avro schemas as well.

This approach is only a first step and will surely need to be improved, reworked in case of mistake and so on...
According to me, currently Flink doesn't offer ability to redirect records to alternative output, don't you?

How does Flink roadmap deal with such additional validation functionalities?
Would committers and users find desirable to introduce such functionality in a further release?

Looking forward to read anyone interested, all the best

François Lacombe


