Greedy operator in match_recognize doesn't work with defined time interval as I expect it to work

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Greedy operator in match_recognize doesn't work with defined time interval as I expect it to work

Theo Diefenthal
Hi there,

I built myself a small example in order to test MATCH_RECOGNIZE in flink.

I have a pattern of format "A B* C", where B is a `1=1` event matching anything. Additionally, I have a time interval of '2' DAY. Now I create testdata matching 
event A, send some random events (B), followed by a C matching event and add another event > 2 days later.

In my expectation, the pipeline should output a match as the pipeline advanced more than 2 days so that even the greedy variable B, which would have consumed event C, now "steps back" in favor for C and produces a complete match. In my test, I sadly didn't get any match.

Some additional information:
-I tested this with Flink 1.8.1
-Making B reluctant (B*?) produces a result, but that's not what I want for the usecase.
-I am not entirely sure whether this is a bug or an expected behavior, that's why I ask here. If that's expected could you explain me why? It seems to contradict the doc in section "time constraint" where another more greedy match is not considered due to "out of time interval"

And another question: In my usecase, I have a lot of events per user but want to apply the pattern only to a small subset of events. That's why in the example pattern, I have the `MISC*` in between two events. In my program, as is right now, I don't use Flink SQL but Flink CEP. I observed that I had a lot of events to be stored in state which drastically reduced performane (Basically all for the pattern time period). So I did a "predicate pushdown" in my pipeline and first filtered out the stream to only interesting events, then applied the pattern which works fine. I was hoping that with MATCH_RECOGNIZE, something similar smart could be optimized. Do you know if some optimizations are planned which go into this direction? I plan to let the user specify queries by himself and that's why I am testing CEP in SQL instead of programming "hardcoded" in Java as I currently do. It will be tough to reason to the advanced user "Your CEP query causes a lot of performance issues due to state, so let me rewrite it for you to e.g. something making a nested query with a pre select and then apply the pattern".

Bug for the bug, here finally is a MVP:

public class FlinkSQLtest {

private static final String QUERY_MATCH_REOGNIZE =
"SELECT * FROM SAMPLE MATCH_RECOGNIZE (" +
" PARTITION BY user " +
" ORDER BY ts " +
" MEASURES FIRSTEVENT.id AS FIRSTID, SECONDEVENT.id AS SECONDID " +
" ONE ROW PER MATCH " +
" AFTER MATCH SKIP PAST LAST ROW " +
" PATTERN (FIRSTEVENT MISC*? SECONDEVENT) " +
" WITHIN INTERVAL '2' DAY " +
" DEFINE " +
" FIRSTEVENT AS payload = 'google.com', " +
" SECONDEVENT AS payload = 'evil.com'" +
")";

private static List<Tuple5<Integer, Long, Integer, String, Long>> createTestdata() {
List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
data.add(new Tuple5<>(6, 1531981987000L, 15, "google.com", 2L)); // 2018-07-19T06:33:07Z
data.add(new Tuple5<>(6, 1531981988000L, 16, "blabla.com", 2L)); // 2018-07-19T06:33:08Z
data.add(new Tuple5<>(6, 1532031988000L, 17, "blabla.com", 2L)); // 2018-07-19T20:26:28Z
data.add(new Tuple5<>(6, 1532069836000L, 20, "evil.com", 2L)); // 2018-07-20T06:57:16Z
data.add(new Tuple5<>(6, 1532415436000L, 21, "heartbeat.com", 1L)); // 2018-07-24T06:57:16Z advance a few days.
return data;
}

@Test
public void testRowPatternMatching() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

List<Tuple5<Integer, Long, Integer, String, Long>> data = createTestdata();

DataStream<Tuple5<Integer, Long, Integer, String, Long>> stream = env.fromCollection(data);
stream = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple5<Integer, Long, Integer, String, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple5<Integer, Long, Integer, String, Long> element) {
return element.f1;
}
});
tEnv.registerDataStream("SAMPLE", stream, "user, ts.rowtime, id, payload, group");

Table tbl = tEnv.sqlQuery(QUERY_MATCH_REOGNIZE);
DataStream<Row> backToStream = tEnv.toAppendStream(tbl, Row.class);
List<Row> list = IteratorUtils.toList(DataStreamUtils.collect(backToStream));
list.stream().forEach(System.err::println);
assertThat(list, hasSize(1));

System.out.println(tEnv.explain(tbl));
}
}

Best regards
Theo Diefenthal