Getting JobManager address and port within a running job

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Getting JobManager address and port within a running job

Biplob Biswas
Hi,

Is there a way to fetch the jobmanager address and port from a running flink job, I was expecting the address and port to be constant but it changes everytime I am running a job. ANd somehow its not honoring the jobmanager.rpc.address and jobmanager.rpc.port set in the flink-conf.yaml file.

If I can get the address within the job itself, I can set it up based on that value.

Thanks & regards
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Nico Kruber
Assuming, from your previous email, that you fire up a LocalFlinkMiniCluster:
this, afaik, does not process flink-conf.yaml but only the configuration given
to it.

If you start a "real" flink cluster, e.g. by bin/start-cluster.sh, it will show
the behaviour you desired.


Nico

On Thursday, 3 August 2017 13:20:37 CEST Biplob Biswas wrote:

> Hi,
>
> Is there a way to fetch the jobmanager address and port from a running flink
> job, I was expecting the address and port to be constant but it changes
> everytime I am running a job. ANd somehow its not honoring the
> jobmanager.rpc.address and jobmanager.rpc.port set in the flink-conf.yaml
> file.
>
> If I can get the address within the job itself, I can set it up based on
> that value.
>
> Thanks & regards
> Biplob
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting
> -JobManager-address-and-port-within-a-running-job-tp14656.html Sent from the
> Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Biplob Biswas
In reply to this post by Biplob Biswas
Also, is it possible to get the JobID from a running flink instance for a streaming job?

I know I can get for a batch job with ExecutionEnvironment.getExecutionEnvironment().getId() but apparently, it generates a new execution environment and returns the job id of that environment for a batch environment and not a streaming environment.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Biplob Biswas
In reply to this post by Nico Kruber
Hi nico,

This behaviour was on my cluster and not on the local mode as I wanted to check whether it's an issue of my job or the behaviour with jobmanager is consistent everywhere.

When I run my job on the yarn-cluster mode, it's not honouring the IP and port I specified and its randomly assigning a node and port. For this reason, I created a yarn session and I ran my flink job within this session.

But still i am having issues with fetching the jobid within a running flink instance.

Thanks
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Aljoscha Krettek
Hi,

How are you specifying the port? Because in YARN mode the config setting is different: yarn.application-master.port. Also, you cannot specify the IP of the JobManager in YARN mode because Flink does not have any influence in how YARN schedules the JobManager so it could run on any machine in the YARN cluster.

By the way, what's the motivation for knowing the JobManager coordinates within a Job?

Best,
Aljoscha

> On 3. Aug 2017, at 15:43, Biplob Biswas <[hidden email]> wrote:
>
> Hi nico,
>
> This behaviour was on my cluster and not on the local mode as I wanted to
> check whether it's an issue of my job or the behaviour with jobmanager is
> consistent everywhere.
>
> When I run my job on the yarn-cluster mode, it's not honouring the IP and
> port I specified and its randomly assigning a node and port. For this
> reason, I created a yarn session and I ran my flink job within this session.
>
> But still i am having issues with fetching the jobid within a running flink
> instance.
>
> Thanks
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14662.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Biplob Biswas
Hi Aljoscha,

I was expecting that I could set the jobmanager address and port by setting it up in the configuration and passing it to the execution environment, but learnt later that it was a wrong approach.

My motivation of accessing the jobmanager coordinates was to setup a queryablestateclient such that I could query the states I created in the same job.  
My job wants me to have 2 states and query based on specific keys on both the states at the same time, so I couldn't imagine achieving this within the same operator as states are created and updated pe key.

is there any abstraction which exposes these values for jobmanager?

For now what I did to achieve what I want is to create 2 states in one flink job and passed the job id and jobmanager information as parameters to the second job which queries the states.

is there a better way to do this?

thanks & regards,
Biplob
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Aljoscha Krettek
Hi,

I don't think Queryable State is good for that use case because it can lead to inconsistencies. However, I think my answer here and the linked presentation might be helpful for your use case: https://lists.apache.org/thread.html/3907cd6433c9c066126ce150cb2fdcc298d366eb55bca890be716020@%3Cuser.flink.apache.org%3E

Best,
Aljoscha

On 9. Aug 2017, at 14:33, Biplob Biswas <[hidden email]> wrote:

Hi Aljoscha,

I was expecting that I could set the jobmanager address and port by setting
it up in the configuration and passing it to the execution environment, but
learnt later that it was a wrong approach.

My motivation of accessing the jobmanager coordinates was to setup a
queryablestateclient such that I could query the states I created in the
same job.  
My job wants me to have 2 states and query based on specific keys on both
the states at the same time, so I couldn't imagine achieving this within the
same operator as states are created and updated pe key.

is there any abstraction which exposes these values for jobmanager?

For now what I did to achieve what I want is to create 2 states in one flink
job and passed the job id and jobmanager information as parameters to the
second job which queries the states.

is there a better way to do this?

thanks & regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14759.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Biplob Biswas
Hi Aljoscha,

Thanks for the link. I read through it but I still can't imagine implementing something similar for my usecase.

I explained my usecase to Fabian in a previous post, I would try to be again as clear as possible.
So, my use case, in short, is something like below:

1) From input stream of events, create 2 different local state store (one with key as event_id and the other with key as derived_key), where both have 1-N relationship. For eg.  1 -> [A,B,C] and A -> [1,2,3]
2) After we have both the state, traverse through the events a second time, generate the derived_keys and based on these keys query state B to get a list of event_ids which in turn is used to query the state A to fetch the events for these ids
3) Do this only till a depth of 1, i.e. stop after doing the above process just once and don't traverse back
4) Combine and merge all the events which were fetched because they are related

So, I can't imagine how I can use the scatter->state->gather pattern to achieve the desired result.Maybe I am wrong and I have missed something, so any insights would be useful!
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Aljoscha Krettek
Ok, I saw that one. Unfortunately it's still not clear to me how that would work. Could you maybe highlight an actual flow of events and describe what will (or should) happen? Describe the current state of the system, i.e. what state is there, then describe what happens when events with given keys come in, how they affect the state and what output should be produced.

Best,
Aljoscha

> On 9. Aug 2017, at 15:38, Biplob Biswas <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Thanks for the link. I read through it but I still can't imagine
> implementing something similar for my usecase.
>
> I explained my usecase to Fabian in a previous post, I would try to be again
> as clear as possible.
> So, my use case, in short, is something like below:
>
> 1) From input stream of events, create 2 different local state store (one
> with key as event_id and the other with key as derived_key), where both have
> 1-N relationship. For eg.  1 -> [A,B,C] and A -> [1,2,3]
> 2) After we have both the state, traverse through the events a second time,
> generate the derived_keys and based on these keys query state B to get a
> list of event_ids which in turn is used to query the state A to fetch the
> events for these ids
> 3) Do this only till a depth of 1, i.e. stop after doing the above process
> just once and don't traverse back
> 4) Combine and merge all the events which were fetched because they are
> related
>
> So, I can't imagine how I can use the scatter->state->gather pattern to
> achieve the desired result.Maybe I am wrong and I have missed something, so
> any insights would be useful!
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-JobManager-address-and-port-within-a-running-job-tp14656p14770.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Getting JobManager address and port within a running job

Biplob Biswas
Hi Aljoscha,


So basically, I am reading events from a kafka topic. These events have corresponding eventIds and a list of modes.

Job1
1. When I first read from the kafka topic, I key by the eventId's and use a processfuntion to create a state named "events". Also, the list of modes are used to generate keys along with some other values from the original event. So in the end we store (list of modekeys, event, timestamp) inside our state.

For ex: For eventId 1 -> ([key1,key2,key3], event1, timestamp)

This function also traverses over the list of generated keys and a series of values are collected which is then sent to the downstream operator.
For ex:
(key1, event1)
(key2, event2)
(key3, event3)

2. The values which were collected above is then used for this second step, where I key by the keys sent by the upstream operator. And then in another process function, I create a second state called "matchkeys". Here, we access old value for the key and add the new value to the list (if available or create a new list) and update the state.

For ex.
Fetch key1 -> ([eventid1, eventid5], timestamp)  
Add value of current eventId21 for the key key1
Update the state for key1 -> ([eventid1, eventid5, eventid21], newTimestamp)

From here we collect (event) and send to downstream operator

This is how the 2 states are created and managed.

Now, the event sent from upstream is sent over a kafka topic from where our Job 2 starts reading.

Job2
It reads the events from the kafka topic where the events were sinked form Job1 (We could've read directly from the original topic, but we need synchronization because otherwise we wouldn't find the events in the states)

1. So after reading from the kafka topic, the list of keys are generated from the modes again. But this time while traversing over the keys, the state "matchkeys" is accessed to get a list of eventId's, which in turn would be used to access the "events" state for a list of keys. This recursion would happen only till a depth of 1, after which all the fetched events would be consolidated and merged(as they refer to the same txn) and this "super" event with values from all the related events is sent over a different topic.


I am not really sure how clear was I in explaining the situation, but let me know if you need any further information.

Also, I am curious to know why using QueryableStateClient within the same job which creates the state lead to inconsistencies. Isn't the state client only capable of reading data without modifying it?

Thanks for all help, btw :)

Best Regards,
Biplob
Loading...