How to split tuple2 returned by UDAF into two columns in a result table

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to split tuple2 returned by UDAF into two columns in a result table

eastcirclek
Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to split tuple2 returned by UDAF into two columns in a result table

Kurt Young
Hi Dongwon, 

AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as (col4, col5)". Am I missing something? 

If you want to split Tuple2 into two different columns, you can use UDTF. 

Best,
Kurt


On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <[hidden email]> wrote:
Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to split tuple2 returned by UDAF into two columns in a result table

eastcirclek
Hi Kurt,
You're right; It is table function like "mytablefunc(col1, col2, col3) as (col4, col5)".
I've got to define a custom UDTF for that purpose.
Thanks,

- Dongwon

On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <[hidden email]> wrote:
Hi Dongwon, 

AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as (col4, col5)". Am I missing something? 

If you want to split Tuple2 into two different columns, you can use UDTF. 

Best,
Kurt


On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <[hidden email]> wrote:
Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to split tuple2 returned by UDAF into two columns in a result table

eastcirclek
Another, yet related question:

Is there something like aggregate table function?
In the above scenario, I have to apply an aggregate function and then apply a table function solely to flatten tuples, which seems quite inefficient.



On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Kurt,
You're right; It is table function like "mytablefunc(col1, col2, col3) as (col4, col5)".
I've got to define a custom UDTF for that purpose.
Thanks,

- Dongwon

On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <[hidden email]> wrote:
Hi Dongwon, 

AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as (col4, col5)". Am I missing something? 

If you want to split Tuple2 into two different columns, you can use UDTF. 

Best,
Kurt


On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <[hidden email]> wrote:
Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon
Reply | Threaded
Open this post in threaded view
|

Re: How to split tuple2 returned by UDAF into two columns in a result table

Fabian Hueske-2
Hi Dongwon,

Couldn't you just return a tuple from the aggregation function and extract the fields from the nested tuple using a value access function [1]?

table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as x")
      .select("name, x.get(0) as col4, x.get(1) as col5")

Best, Fabian


Am Mi., 20. März 2019 um 07:15 Uhr schrieb Dongwon Kim <[hidden email]>:
Another, yet related question:

Is there something like aggregate table function?
In the above scenario, I have to apply an aggregate function and then apply a table function solely to flatten tuples, which seems quite inefficient.



On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <[hidden email]> wrote:
Hi Kurt,
You're right; It is table function like "mytablefunc(col1, col2, col3) as (col4, col5)".
I've got to define a custom UDTF for that purpose.
Thanks,

- Dongwon

On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <[hidden email]> wrote:
Hi Dongwon, 

AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as (col4, col5)". Am I missing something? 

If you want to split Tuple2 into two different columns, you can use UDTF. 

Best,
Kurt


On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <[hidden email]> wrote:
Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon