add() method of AggregateFunction not called even though new watermark is emitted

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

add() method of AggregateFunction not called even though new watermark is emitted

Vijay Balakrishnan
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Congxian Qiu
Hi

Do you mean `windowStream.aggregate` do not work for all records or just some records. If for some records, can you try to confirm that the assigned watermark is monotonic increase. If for all records, can you confirm that the watermark has reached the end of the window?

In another word, could you share how do you tell that `windowStream.aggregate method doesn't seem to fire`?

Best,
Congxian


Vijay Balakrishnan <[hidden email]> 于2019年10月12日周六 上午3:37写道:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Dawid Wysakowicz-2
In reply to this post by Vijay Balakrishnan

Hi Vijay,

Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Vijay Balakrishnan
Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ????

I did check the Flink UI for the currentInputWatermark and it is increasing monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also added a checkpoint to see if it was causing the issue.Didn't seem to help.
Most of the code is reached during the creation of the ExecutionGraph on the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
    final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> {
                boolean result;
                String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : "";
                InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
                String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : "";
                result = eventName.equals(measurement);
                if (result) {
                    Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
                    Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
                    result = matchTags(inputTags, ruleTags);
                }
                return result;//<== this is true
            }
    ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> {
            out.collect(input);//<==== runs up till here fine
    }).returns(new TypeHint<Map<String, Object>>() {
    });
//doesn't do anything beyond this point at runtime
    DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1
            (filteredKinesisStream, ruleFactory, rule, parallelProcess);
    enrichedMGStream.addSink(influxSink)
            .setParallelism(nbrSinks);
}

private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) {
    DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
    RuleConfig ruleConfig = rule.getRuleConfig();
    String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
    RuleIF ruleImpl = ruleFactory.getRule(ruleType);
    Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>();
    Object intervalObj = ruleProps.get("rule_eval_window");
    String timeInterval = intervalObj != null ? (String) intervalObj : "";
    org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval);

    Object windowTypeObj = ruleProps.get("window_type");
    String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

    InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
    Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
    String groupByObj = tags.get(GROUP_BY);
    String groupBy = groupByObj != null ? groupByObj : "";
    kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> {
        Object groupByValueObj = inputMap.get(groupBy);
        return groupByValueObj != null;
    });
    Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
    String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
    KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
            kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project
    enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess);
    return enrichedComponentInstanceStream1;
}

private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream,
                                                                org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType,
                                                                String interval,
                                                                RuleIF ruleImpl, Rule rule, int parallelProcess) {
    long slide = 100;
    final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream =
            windowType.equalsIgnoreCase(SLIDING) ?
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow);
    return windowStream.aggregate(
            new MGroupingWindowAggregate(interval),//<=== never gets into add() here
            new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
            .map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Vijay,

Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Theo Diefenthal
Hi Vijay,

Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or is not even the filter function itself called? (Due to your comments suggesting it).

Best regards
Theo


Von: "Vijay Balakrishnan" <[hidden email]>
An: "Dawid Wysakowicz" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Dienstag, 15. Oktober 2019 02:01:05
Betreff: Re: add() method of AggregateFunction not called even though new watermark is emitted

Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ????

I did check the Flink UI for the currentInputWatermark and it is increasing monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also added a checkpoint to see if it was causing the issue.Didn't seem to help.
Most of the code is reached during the creation of the ExecutionGraph on the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
    final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> {
                boolean result;
                String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : "";
                InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
                String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : "";
                result = eventName.equals(measurement);
                if (result) {
                    Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
                    Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
                    result = matchTags(inputTags, ruleTags);
                }
                return result;//<== this is true
            }
    ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> {
            out.collect(input);//<==== runs up till here fine
    }).returns(new TypeHint<Map<String, Object>>() {
    });
//doesn't do anything beyond this point at runtime
    DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1
            (filteredKinesisStream, ruleFactory, rule, parallelProcess);
    enrichedMGStream.addSink(influxSink)
            .setParallelism(nbrSinks);
}

private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) {
    DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
    RuleConfig ruleConfig = rule.getRuleConfig();
    String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
    RuleIF ruleImpl = ruleFactory.getRule(ruleType);
    Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>();
    Object intervalObj = ruleProps.get("rule_eval_window");
    String timeInterval = intervalObj != null ? (String) intervalObj : "";
    org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval);

    Object windowTypeObj = ruleProps.get("window_type");
    String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

    InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
    Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
    String groupByObj = tags.get(GROUP_BY);
    String groupBy = groupByObj != null ? groupByObj : "";
    kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> {
        Object groupByValueObj = inputMap.get(groupBy);
        return groupByValueObj != null;
    });
    Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
    String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
    KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
            kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project
    enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess);
    return enrichedComponentInstanceStream1;
}

private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream,
                                                                org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType,
                                                                String interval,
                                                                RuleIF ruleImpl, Rule rule, int parallelProcess) {
    long slide = 100;
    final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream =
            windowType.equalsIgnoreCase(SLIDING) ?
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow);
    return windowStream.aggregate(
            new MGroupingWindowAggregate(interval),//<=== never gets into add() here
            new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
            .map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Vijay,

Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Vijay Balakrishnan
Hi Theo,
It gets to the FilterFunction during the creation of the ExecutionGraph initially but not during the runtime when recs are streaming in.So, it is not getting that far- seems to be stuck in the 
final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter   code.
Doesn't seem to get past it as it keeps incrementing watermarks but the Watermark never seems to hit the end of the window.Maybe I am doing 
something super simple stupid.
TIA,
Vijay

On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <[hidden email]> wrote:
Hi Vijay,

Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or is not even the filter function itself called? (Due to your comments suggesting it).

Best regards
Theo


Von: "Vijay Balakrishnan" <[hidden email]>
An: "Dawid Wysakowicz" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Dienstag, 15. Oktober 2019 02:01:05
Betreff: Re: add() method of AggregateFunction not called even though new watermark is emitted

Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ????

I did check the Flink UI for the currentInputWatermark and it is increasing monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also added a checkpoint to see if it was causing the issue.Didn't seem to help.
Most of the code is reached during the creation of the ExecutionGraph on the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
    final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> {
                boolean result;
                String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : "";
                InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
                String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : "";
                result = eventName.equals(measurement);
                if (result) {
                    Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
                    Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
                    result = matchTags(inputTags, ruleTags);
                }
                return result;//<== this is true
            }
    ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> {
            out.collect(input);//<==== runs up till here fine
    }).returns(new TypeHint<Map<String, Object>>() {
    });
//doesn't do anything beyond this point at runtime
    DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1
            (filteredKinesisStream, ruleFactory, rule, parallelProcess);
    enrichedMGStream.addSink(influxSink)
            .setParallelism(nbrSinks);
}

private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) {
    DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
    RuleConfig ruleConfig = rule.getRuleConfig();
    String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
    RuleIF ruleImpl = ruleFactory.getRule(ruleType);
    Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>();
    Object intervalObj = ruleProps.get("rule_eval_window");
    String timeInterval = intervalObj != null ? (String) intervalObj : "";
    org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval);

    Object windowTypeObj = ruleProps.get("window_type");
    String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

    InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
    Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
    String groupByObj = tags.get(GROUP_BY);
    String groupBy = groupByObj != null ? groupByObj : "";
    kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> {
        Object groupByValueObj = inputMap.get(groupBy);
        return groupByValueObj != null;
    });
    Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
    String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
    KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
            kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project
    enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess);
    return enrichedComponentInstanceStream1;
}

private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream,
                                                                org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType,
                                                                String interval,
                                                                RuleIF ruleImpl, Rule rule, int parallelProcess) {
    long slide = 100;
    final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream =
            windowType.equalsIgnoreCase(SLIDING) ?
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow);
    return windowStream.aggregate(
            new MGroupingWindowAggregate(interval),//<=== never gets into add() here
            new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
            .map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Vijay,

Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

       
Reply | Threaded
Open this post in threaded view
|

Re: add() method of AggregateFunction not called even though new watermark is emitted

Vijay Balakrishnan
Hi Theo,
You were right. For some reason(I still haven't figured it out) but the FilterFunction was causing issues. I commented it out and it started getting into the add() method of the aggregate method.

/*kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});*/
//String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
TIA,
Vijay



On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi Theo,
It gets to the FilterFunction during the creation of the ExecutionGraph initially but not during the runtime when recs are streaming in.So, it is not getting that far- seems to be stuck in the 
final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter   code.
Doesn't seem to get past it as it keeps incrementing watermarks but the Watermark never seems to hit the end of the window.Maybe I am doing 
something super simple stupid.
TIA,
Vijay

On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <[hidden email]> wrote:
Hi Vijay,

Maybe a stupid question, but according to your comments, the code works fine up till a "flatMap" operation. It seems that this flatMap is directly followed by a filter-Function in the method createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out all events? Or is not even the filter function itself called? (Due to your comments suggesting it).

Best regards
Theo


Von: "Vijay Balakrishnan" <[hidden email]>
An: "Dawid Wysakowicz" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Dienstag, 15. Oktober 2019 02:01:05
Betreff: Re: add() method of AggregateFunction not called even though new watermark is emitted

Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ????

I did check the Flink UI for the currentInputWatermark and it is increasing monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also added a checkpoint to see if it was causing the issue.Didn't seem to help.
Most of the code is reached during the creation of the ExecutionGraph on the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
    final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> {
                boolean result;
                String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : "";
                InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
                String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : "";
                result = eventName.equals(measurement);
                if (result) {
                    Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
                    Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
                    result = matchTags(inputTags, ruleTags);
                }
                return result;//<== this is true
            }
    ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> {
            out.collect(input);//<==== runs up till here fine
    }).returns(new TypeHint<Map<String, Object>>() {
    });
//doesn't do anything beyond this point at runtime
    DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1
            (filteredKinesisStream, ruleFactory, rule, parallelProcess);
    enrichedMGStream.addSink(influxSink)
            .setParallelism(nbrSinks);
}

private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) {
    DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
    RuleConfig ruleConfig = rule.getRuleConfig();
    String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
    RuleIF ruleImpl = ruleFactory.getRule(ruleType);
    Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>();
    Object intervalObj = ruleProps.get("rule_eval_window");
    String timeInterval = intervalObj != null ? (String) intervalObj : "";
    org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval);

    Object windowTypeObj = ruleProps.get("window_type");
    String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

    InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
    Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>();
    String groupByObj = tags.get(GROUP_BY);
    String groupBy = groupByObj != null ? groupByObj : "";
    kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> {
        Object groupByValueObj = inputMap.get(groupBy);
        return groupByValueObj != null;
    });
    Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
    String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
    KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
            kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project
    enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess);
    return enrichedComponentInstanceStream1;
}

private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream,
                                                                org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType,
                                                                String interval,
                                                                RuleIF ruleImpl, Rule rule, int parallelProcess) {
    long slide = 100;
    final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream =
            windowType.equalsIgnoreCase(SLIDING) ?
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow);
    return windowStream.aggregate(
            new MGroupingWindowAggregate(interval),//<=== never gets into add() here
            new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
            .map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Vijay,

Could you check if the Watermark for the aggregate operator advances? You should be able to check that in the Flink WebUI. Could it be that the Watermark does not advance for all of the upstream operators? The watermark for a particular operator is a minimum of watermarks received from all of the upstream operators. Therefore if some of them does not produce any, the resulting watermark will not advance.

Best,

Dawdi

On 11/10/2019 21:37, Vijay Balakrishnan wrote:
Hi,
Here is my issue with Event Processing with the add() method of MGroupingWindowAggregate not being called even though a new watermark is fired
1. Ingest data from Kinesis (works fine)
2. Deserialize in MonitoringMapKinesisSchema(works fine and get json back)
3. I do assign MonitoringTSWAssigner(code below) to the source with bound of 10(have tried 3000, 30000). It fires a new WaterMark with each
incoming record but the windowStream.aggregate method doesn't seem to fire and I don't see the add() method of MGroupingWindowAggregate
called
???? I can see the newWaterMark being emitted in TimestampsAndPunctuatedWatermarksOperator.processElement
4. I have tried with timeWindow of 1m and 15s

Main code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true), kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource = env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new MonitoringTSWAssigner(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(timeIntervalL);//tried with timeIntervalL=15s, 1m 

KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = windowStream.aggregate(//<===== never reaches here ?????
        new MGroupingWindowAggregate(interval),
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

MonitoringTSWAssigner code:
public class MonitoringTSWAssigner implements AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }
   return extractedTS;//return System.currentTimeMillis();
    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object> monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

MGroupingWindowAggregate:
public class MGroupingWindowAggregate implements AggregateFunction<Map<String, Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring, Map<String, Object> timedMap) {
.....
}

.....

}

TIA,