How to assign a UID to a KeyedStream?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

How to assign a UID to a KeyedStream?

Ken Krugler
Hi all,

Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state example in the online tutorial. 


I then added UIDs for all operators, sources & sinks. But I still get the following when calling env.getExecutionPlan() or env.execute():

java.lang.IllegalStateException: Auto generated UIDs have been disabled but no UID or hash has been assigned to operator Partition
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(

The simple workflow is:

        DataStream<TaxiRide> rides = env
                .addSource(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))
                .uid("source: taxi rides")
                .name("taxi rides")
                .filter((TaxiRide ride) -> ride.isStart)
                .uid("filter: only start rides")
                .name("only start rides")
                .keyBy((TaxiRide ride) -> ride.rideId);

        DataStream<TaxiFare> fares = env
                .addSource(new CheckpointedTaxiFareSource(faresFile, servingSpeedFactor))
                .uid("source: taxi fares")
                .name("taxi fares")
                .keyBy((TaxiFare fare) -> fare.rideId);

        DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides
                .flatMap(new EnrichmentFunction())
                .uid("function: enrich rides with fares")
                .name("enrich rides with fares")
                .uid("sink: enriched taxi rides")
                .name("enriched taxi rides");

Internally the exception is thrown when the EnrichFunction (a RichCoFlatMapFunction) is being transformed by StreamGraphGenerator.transformTwoInputTransform().

This calls StreamGraphGenerator.transform() with the two inputs, but the Transformation for each input is a PartitionTransformation.

I don’t see a way to set the UID following the keyBy(), as a KeyedStream creates the PartitionTransformation without a UID.

Any insight into setting the UID properly here? Or should StreamGraphGenerator.transform() skip the no-uid check for PartitionTransformation, since that’s not an operator with state?


— Ken

Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr