|
|
Hi, Experimenting with the StreamTableEnvironment I build something like this: DataStream<Tuple3<Long, String, Long>> letterStream = ... tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this
TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.
So I have a Measurement.avdl that (among other things) contains something like this:
record Measurement { /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */ long timestamp; string letter; long pageviews; }
Now because the registerDataStream call can also derive the schema from the provided data I can do this:
DataStream<Measurement> inputStream = ... tableEnv.registerDataStream("DataStream", inputStream);
This is very nice because any real schema is big (few hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime.
How do I do that?
-- Best regards / Met vriendelijke groeten,
Niels Basjes
|
|
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels
Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something
like this:
DataStream<Tuple3<Long,
String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is
now being used as the rowtime and has the DATETIME so I can do
this
TUMBLE_START(eventTime, INTERVAL
'1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using
Apache Avro.
So I have a Measurement.avdl that (among other things)
contains something like this:
record Measurement {
/** The time (epoch in
milliseconds since 1970-01-01 UTC) when the event occurred
*/
long
timestamp;
string
letter;
long
pageviews;
}
Now because the registerDataStream call can also derive the
schema from the provided data I can do this:
DataStream<Measurement>
inputStream = ...
tableEnv.registerDataStream("DataStream",
inputStream);
This is very nice because any real schema is big (few
hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME
and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments
of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'.
Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system
understand that the timestamp column show be treated as the
rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
|
|
Hi,
It has taken me quite a bit of time to figure this out. This is the solution I have now (works on my machine).
Please tell me where I can improve this.
Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure. With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.
What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.
So I have DataStream<Measurement> inputStream = ...
then I register the stream with
TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class); String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
List<String> rootSchema = new ArrayList<>(); for (String fieldName: fieldNames) { if (rowtimeFieldName.equals(fieldName)) { rootSchema.add(fieldName + ".rowtime"); } else { rootSchema.add(fieldName); } }
tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));
Now after the actual SQL has been executed I have a Now simply feeding this into a DataStream with something like this fails badly. TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes()); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo
So I have to do it this way (NASTY!): final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes(); int index; for(index = 0 ; index < fieldTypes.length ; index++) { if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { fieldTypes[index] = SQL_TIMESTAMP; } } TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.
Niels Basjes
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels
Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something
like this:
DataStream<Tuple3<Long,
String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is
now being used as the rowtime and has the DATETIME so I can do
this
TUMBLE_START(eventTime, INTERVAL
'1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using
Apache Avro.
So I have a Measurement.avdl that (among other things)
contains something like this:
record Measurement {
/** The time (epoch in
milliseconds since 1970-01-01 UTC) when the event occurred
*/
long
timestamp;
string
letter;
long
pageviews;
}
Now because the registerDataStream call can also derive the
schema from the provided data I can do this:
DataStream<Measurement>
inputStream = ...
tableEnv.registerDataStream("DataStream",
inputStream);
This is very nice because any real schema is big (few
hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME
and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments
of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'.
Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system
understand that the timestamp column show be treated as the
rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
-- Best regards / Met vriendelijke groeten,
Niels Basjes
|
|
Hi Niels,
I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.
Best, Fabian
Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes < [hidden email]>: Hi,
It has taken me quite a bit of time to figure this out. This is the solution I have now (works on my machine).
Please tell me where I can improve this.
Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure. With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.
What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.
So I have DataStream<Measurement> inputStream = ...
then I register the stream with
TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class); String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
List<String> rootSchema = new ArrayList<>(); for (String fieldName: fieldNames) { if (rowtimeFieldName.equals(fieldName)) { rootSchema.add(fieldName + ".rowtime"); } else { rootSchema.add(fieldName); } }
tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));
Now after the actual SQL has been executed I have a Now simply feeding this into a DataStream with something like this fails badly. TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes()); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo
So I have to do it this way (NASTY!): final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes(); int index; for(index = 0 ; index < fieldTypes.length ; index++) { if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { fieldTypes[index] = SQL_TIMESTAMP; } } TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.
Niels Basjes
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels
Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something
like this:
DataStream<Tuple3<Long,
String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is
now being used as the rowtime and has the DATETIME so I can do
this
TUMBLE_START(eventTime, INTERVAL
'1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using
Apache Avro.
So I have a Measurement.avdl that (among other things)
contains something like this:
record Measurement {
/** The time (epoch in
milliseconds since 1970-01-01 UTC) when the event occurred
*/
long
timestamp;
string
letter;
long
pageviews;
}
Now because the registerDataStream call can also derive the
schema from the provided data I can do this:
DataStream<Measurement>
inputStream = ...
tableEnv.registerDataStream("DataStream",
inputStream);
This is very nice because any real schema is big (few
hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME
and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments
of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'.
Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system
understand that the timestamp column show be treated as the
rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
--
Best regards / Met vriendelijke groeten,
Niels Basjes
|
|
Hi.
Can you give me an example of the actual syntax of such a cast? Hi Niels,
I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.
Best, Fabian
Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes < [hidden email]>: Hi,
It has taken me quite a bit of time to figure this out. This is the solution I have now (works on my machine).
Please tell me where I can improve this.
Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure. With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.
What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.
So I have DataStream<Measurement> inputStream = ...
then I register the stream with
TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class); String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
List<String> rootSchema = new ArrayList<>(); for (String fieldName: fieldNames) { if (rowtimeFieldName.equals(fieldName)) { rootSchema.add(fieldName + ".rowtime"); } else { rootSchema.add(fieldName); } }
tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));
Now after the actual SQL has been executed I have a Now simply feeding this into a DataStream with something like this fails badly. TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes()); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo
So I have to do it this way (NASTY!): final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes(); int index; for(index = 0 ; index < fieldTypes.length ; index++) { if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { fieldTypes[index] = SQL_TIMESTAMP; } } TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.
Niels Basjes
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels
Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something
like this:
DataStream<Tuple3<Long,
String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is
now being used as the rowtime and has the DATETIME so I can do
this
TUMBLE_START(eventTime, INTERVAL
'1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using
Apache Avro.
So I have a Measurement.avdl that (among other things)
contains something like this:
record Measurement {
/** The time (epoch in
milliseconds since 1970-01-01 UTC) when the event occurred
*/
long
timestamp;
string
letter;
long
pageviews;
}
Now because the registerDataStream call can also derive the
schema from the provided data I can do this:
DataStream<Measurement>
inputStream = ...
tableEnv.registerDataStream("DataStream",
inputStream);
This is very nice because any real schema is big (few
hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME
and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments
of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'.
Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system
understand that the timestamp column show be treated as the
rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
--
Best regards / Met vriendelijke groeten,
Niels Basjes
|
|
Hi,
that would be regular SQL cast syntax:
SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ...
Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes < [hidden email]>: Hi.
Can you give me an example of the actual syntax of such a cast?
Hi Niels,
I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.
Best, Fabian
Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes < [hidden email]>: Hi,
It has taken me quite a bit of time to figure this out. This is the solution I have now (works on my machine).
Please tell me where I can improve this.
Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure. With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.
What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.
So I have DataStream<Measurement> inputStream = ...
then I register the stream with
TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class); String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
List<String> rootSchema = new ArrayList<>(); for (String fieldName: fieldNames) { if (rowtimeFieldName.equals(fieldName)) { rootSchema.add(fieldName + ".rowtime"); } else { rootSchema.add(fieldName); } }
tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));
Now after the actual SQL has been executed I have a Now simply feeding this into a DataStream with something like this fails badly. TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes()); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo
So I have to do it this way (NASTY!): final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes(); int index; for(index = 0 ; index < fieldTypes.length ; index++) { if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { fieldTypes[index] = SQL_TIMESTAMP; } } TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.
Niels Basjes
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
Timo
Am 14.08.19 um 17:00 schrieb Niels
Basjes:
Hi,
Experimenting with the StreamTableEnvironment I build something
like this:
DataStream<Tuple3<Long,
String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
Because the "EventTime" was tagged with ".rowtime" it is
now being used as the rowtime and has the DATETIME so I can do
this
TUMBLE_START(eventTime, INTERVAL
'1' MINUTE)
So far so good.
Working towards a more realistic scenario I have a source that
produces a stream of records that have been defined using
Apache Avro.
So I have a Measurement.avdl that (among other things)
contains something like this:
record Measurement {
/** The time (epoch in
milliseconds since 1970-01-01 UTC) when the event occurred
*/
long
timestamp;
string
letter;
long
pageviews;
}
Now because the registerDataStream call can also derive the
schema from the provided data I can do this:
DataStream<Measurement>
inputStream = ...
tableEnv.registerDataStream("DataStream",
inputStream);
This is very nice because any real schema is big (few
hundred columns) and changes over time.
Now In the SQL the timestamp is a BIGINT and not a DATETIME
and as a consequence I get this error
Cannot apply 'TUMBLE' to arguments
of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'.
Supported form(s): 'TUMBLE(<DATETIME>,
<DATETIME_INTERVAL>)'
So far I have now yet figured how to make the system
understand that the timestamp column show be treated as the
rowtime.
How do I do that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
--
Best regards / Met vriendelijke groeten,
Niels Basjes
|
|