I'm not able to make a stream-stream Time windows JOIN in Flink SQL

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Theo Diefenthal
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Zhenghua Gao

On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Zhenghua Gao
I wrote a demo example for time windowed join which you can pick up [1]


Best Regards,
Zhenghua Gao


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <[hidden email]> wrote:

On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Fabian Hueske-2
Hi Theo,

The main problem is that the semantics of your join (Join all events that happened on the same day) are not well-supported by Flink yet.

In terms of true streaming joins, Flink supports the time-windowed join (with the BETWEEN predicate) and the time-versioned table join (which does not apply here).
The first does not really fit because it puts the windows "around the event", i.e., if you have an event at 12:35 and a window of 10 mins earlier and 15 mins later, it will join with events between 12:25 and 12:50.
An other limitation of Flink is that you cannot modify event-time attributes (well you can, but they lose their event-time property and become regular TIMESTAMP attributes).
This limitation exists, because we must ensure that the attributes are still aligned with watermarks after they were modified (or adjusting the watermarks accordingly).
Since analyzing expressions that modify timestamps to figure out whether they preserve watermark alignment is very difficult, we opted to always remove event-time property when an event-time attribute is modified.

I see two options for your use case:

1) use the join that you described before with the -24 and +24 hour window and apply more fine-grained predicates to filter out the join results that you don't need.
2) add an additional time attribute to your input that is a rounded down version of the timestamp (rounded to 24h), declare the rounded timestamp as your event-time attribute, and join with an equality predicate on the rounded timestamp.

Best, Fabian

Am Di., 13. Aug. 2019 um 13:41 Uhr schrieb Zhenghua Gao <[hidden email]>:
I wrote a demo example for time windowed join which you can pick up [1]


Best Regards,
Zhenghua Gao


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <[hidden email]> wrote:

On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Theo Diefenthal
Hi Fabian, Hi Zhenghua

Thank you for your suggestions and telling me that I was on the right track. And good to know how to find out whether something yields to time-bounded or regular join.

@Fabian: Regarding your suggested first option: Isn't that exactly what my first try was? With this TUMBLE_START... That sadly didn't work due to " Rowtime attributes must not be in the input rows of a regular join ". But I'll give option 2 a try by just adding another attribute.

And some addition: Regarding my second try: I wrote that the reduced query didn't produce any data, but that was indeed my mistake. I fiddled around too much with my data so that I manipulated the original data in a way that the query couldn't output a result any more when testing all of those combinations. Now the second attempt works but isn't really what I wanted to query (as the "same day"-predicate is still missing).

Best regards
Theo


Von: "Fabian Hueske" <[hidden email]>
An: "Zhenghua Gao" <[hidden email]>
CC: "Theo Diefenthal" <[hidden email]>, "user" <[hidden email]>
Gesendet: Freitag, 16. August 2019 10:05:45
Betreff: Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Hi Theo,

The main problem is that the semantics of your join (Join all events that happened on the same day) are not well-supported by Flink yet.

In terms of true streaming joins, Flink supports the time-windowed join (with the BETWEEN predicate) and the time-versioned table join (which does not apply here).
The first does not really fit because it puts the windows "around the event", i.e., if you have an event at 12:35 and a window of 10 mins earlier and 15 mins later, it will join with events between 12:25 and 12:50.
An other limitation of Flink is that you cannot modify event-time attributes (well you can, but they lose their event-time property and become regular TIMESTAMP attributes).
This limitation exists, because we must ensure that the attributes are still aligned with watermarks after they were modified (or adjusting the watermarks accordingly).
Since analyzing expressions that modify timestamps to figure out whether they preserve watermark alignment is very difficult, we opted to always remove event-time property when an event-time attribute is modified.

I see two options for your use case:

1) use the join that you described before with the -24 and +24 hour window and apply more fine-grained predicates to filter out the join results that you don't need.
2) add an additional time attribute to your input that is a rounded down version of the timestamp (rounded to 24h), declare the rounded timestamp as your event-time attribute, and join with an equality predicate on the rounded timestamp.

Best, Fabian

Am Di., 13. Aug. 2019 um 13:41 Uhr schrieb Zhenghua Gao <[hidden email]>:
I wrote a demo example for time windowed join which you can pick up [1]

Best Regards,
Zhenghua Gao


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <[hidden email]> wrote:

On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal

Reply | Threaded
Open this post in threaded view
|

Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Fabian Hueske-2
Hi Theo,

Re your first approach:
TUMBLE_START is treated as a special function.It can only be used in the SELECT clause if there is a TUMBLE function call in the GROUP BY cause.
If you use FLOOR(s1.ts TO DAY) == FLOOR(s2.ts TO DAY) it should work.
You can also drop one of the BETWEEN predicates because the condition is symmetric.

So basically, you need to add the FLOOR ... predicate to your second approach and that should do the trick.
In principle, your query runs a Flink-supported time-window join and you add a more restrictive predicate to it.

Best,
Fabian

Am Fr., 23. Aug. 2019 um 14:23 Uhr schrieb Theo Diefenthal <[hidden email]>:
Hi Fabian, Hi Zhenghua

Thank you for your suggestions and telling me that I was on the right track. And good to know how to find out whether something yields to time-bounded or regular join.

@Fabian: Regarding your suggested first option: Isn't that exactly what my first try was? With this TUMBLE_START... That sadly didn't work due to " Rowtime attributes must not be in the input rows of a regular join ". But I'll give option 2 a try by just adding another attribute.

And some addition: Regarding my second try: I wrote that the reduced query didn't produce any data, but that was indeed my mistake. I fiddled around too much with my data so that I manipulated the original data in a way that the query couldn't output a result any more when testing all of those combinations. Now the second attempt works but isn't really what I wanted to query (as the "same day"-predicate is still missing).

Best regards
Theo


Von: "Fabian Hueske" <[hidden email]>
An: "Zhenghua Gao" <[hidden email]>
CC: "Theo Diefenthal" <[hidden email]>, "user" <[hidden email]>
Gesendet: Freitag, 16. August 2019 10:05:45
Betreff: Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

Hi Theo,

The main problem is that the semantics of your join (Join all events that happened on the same day) are not well-supported by Flink yet.

In terms of true streaming joins, Flink supports the time-windowed join (with the BETWEEN predicate) and the time-versioned table join (which does not apply here).
The first does not really fit because it puts the windows "around the event", i.e., if you have an event at 12:35 and a window of 10 mins earlier and 15 mins later, it will join with events between 12:25 and 12:50.
An other limitation of Flink is that you cannot modify event-time attributes (well you can, but they lose their event-time property and become regular TIMESTAMP attributes).
This limitation exists, because we must ensure that the attributes are still aligned with watermarks after they were modified (or adjusting the watermarks accordingly).
Since analyzing expressions that modify timestamps to figure out whether they preserve watermark alignment is very difficult, we opted to always remove event-time property when an event-time attribute is modified.

I see two options for your use case:

1) use the join that you described before with the -24 and +24 hour window and apply more fine-grained predicates to filter out the join results that you don't need.
2) add an additional time attribute to your input that is a rounded down version of the timestamp (rounded to 24h), declare the rounded timestamp as your event-time attribute, and join with an equality predicate on the rounded timestamp.

Best, Fabian

Am Di., 13. Aug. 2019 um 13:41 Uhr schrieb Zhenghua Gao <[hidden email]>:
I wrote a demo example for time windowed join which you can pick up [1]

Best Regards,
Zhenghua Gao


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <[hidden email]> wrote:

On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <[hidden email]> wrote:
Hi there,

Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day.

In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following:

SELECT *
FROM STREAM1
JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2."ts");
which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column).

My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream.

First try (to conform time-bounded-JOIN conditions):
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, INTERVAL'1' DAY) -- Reduce to matchings on the same day.
This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams!

Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same.
I then reduced the Condition to just one time bound:
SELECT s1.id, s2.id 
FROM STREAM1 AS s1
JOIN STREAM2 AS s2
ON s1.`user` = s2.userName
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR
which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests).

My questions are now:
1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN?
2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window?

Best regards
Theo Diefenthal