Counter Implementation in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Counter Implementation in Flink

Anil
This post was updated on .
I'm using Flink 1.4.2 and deploying job on Yarn Cluster.

I have a streaming job, which flattens the data and outputs it. It basically
takes a input record and produces n output record. I'm using Table Function
for this. The logic to flatten the data is implemented in a UDF. The UDF has
a counter which basically counts the number of records produced.
           this.context.getMetricGroup().counter("output_records_counter")

I know Flink provides numRecordsOut metric which is essentially gives me the
same number.
When the job is started the output records count seen for
`output_records_counter` counter and `numRecordsOut` are exactly same.

When a task manager is lost and the job is restarted there's a huge
difference in the count of output records  . As seen in the graph when the
job was started the both the counts are overlapping. When a task manager is
lost and is re-deployed the count is different.  I'm not sure why this
number varies so much.

Can someone please shed some light on how is this counter implemented or
direct me to source code or any reference material.

For numRecordsOut, each taskmanager emits the count of data. Is the same not
true for the output_records_counter.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Counter Implementation in Flink

Chesnay Schepler
The default implementation is the SimpleCounter, however I believe that
the implementation isn't of interest compared to how it is used.

Please expand on the graph ( which metric is represented by what color )
and include all of your code that interacts with the your counter.

On 24.11.2018 16:36, Anil wrote:

> I'm using Flink 1.4.2 and deploying job on Yarn Cluster.
>
> I have a streaming job, which flattens the data and outputs it. It basically
> takes a input record and produces n output record. I'm using Table Function
> for this. The logic to flatten the data is implemented in a UDF. The UDF has
> a counter which basically counts the number of records produced.
>             this.context.getMetricGroup().counter("output_records_counter")
>
> I know Flink provides numRecordsOut metric which is essentially gives me the
> same number.
> When the job is started the output records count seen for
> `output_records_counter` counter and `numRecordsOut` are exactly same.
>
> When a task manager is lost and the job is restarted there's a huge
> difference in the count of output records  . As seen in the graph when the
> job was started the both the counts are overlapping. When a task manager is
> lost and is re-deployed the count is different.  I'm not sure why this
> number varies so much.
>
> Can someone please shed some light on how is this counter implemented or
> direct me to source code or any reference material.
>
> For numRecordsOut, each taskmanager emits the count of data. Is the same not
> true for the output_records_counter.
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1468/Screen_Shot_2018-11-24_at_9.png>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>