Queryable State

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Queryable State

Chet Masterson
I moved up from running queryable state on a standalone Flink instance to a several node cluster. My queries don't seem to be responding when I execute them on the cluster. A few questions:
 
1. The error I am getting:
WARN [ReliableDeliverySupervisor] Association with remote system [akka.tcp://[hidden email]:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[hidden email]:6123]] Caused by: [Connection refused: /x.x.x.x:6123]
2017/04/23 20:19:01.016 ERROR Actor not found for: ActorSelection[Anchor(akka.tcp://[hidden email]:6123/), Path(/user/jobmanager)]
 
I assume this is because Flink is not servicing requests on port :6123. I am using the default RPC ports defined in flink-conf.yaml. I confirm nothing is listening on port 6123 by using netstat on the flink nodes.
 
2. I make the following changes on all nodes to flink-conf.yaml, then restart the cluster
 
#jobmanager.rpc.port: 6123
query.server.port: 6123
query.server.enable: true
 
3. Now port 6123 is open, as viewed from netstat.
 
My question: what is the proper configuration for servicing external queries when running in a cluster? Can I use jobmanager.rpc.port: 6123 which works standalone, do I have to add query.server.port and query.server.enable? Which port should I be using?
 
 
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Ufuk Celebi
You should be able to use queryable state w/o any changes to the
default config. The `query.server.port` option defines the port of the
queryable state server that serves the state on the task managers and
it is enabled by default.

The important thing is to configure the client to discover the
JobManager and everything else should work out of the box. Can you
please

1) Use the default config and verify in the JobManager logs that the
JobManager listens on port 6123 (the default JM port) and that all
expected TaskManagers connect to it?

2) Share the code for how you configure the QueryableStateClient?

– Ufuk


On Mon, Apr 24, 2017 at 1:45 PM, Chet Masterson
<[hidden email]> wrote:

> I moved up from running queryable state on a standalone Flink instance to a
> several node cluster. My queries don't seem to be responding when I execute
> them on the cluster. A few questions:
>
> 1. The error I am getting:
> WARN [ReliableDeliverySupervisor] Association with remote system
> [akka.tcp://[hidden email]:6123] has failed, address is now gated for [5000]
> ms. Reason: [Association failed with [akka.tcp://[hidden email]:6123]] Caused
> by: [Connection refused: /x.x.x.x:6123]
> 2017/04/23 20:19:01.016 ERROR Actor not found for:
> ActorSelection[Anchor(akka.tcp://[hidden email]:6123/),
> Path(/user/jobmanager)]
>
> I assume this is because Flink is not servicing requests on port :6123. I am
> using the default RPC ports defined in flink-conf.yaml. I confirm nothing is
> listening on port 6123 by using netstat on the flink nodes.
>
> 2. I make the following changes on all nodes to flink-conf.yaml, then
> restart the cluster
>
> #jobmanager.rpc.port: 6123
> query.server.port: 6123
> query.server.enable: true
>
> 3. Now port 6123 is open, as viewed from netstat.
>
> My question: what is the proper configuration for servicing external queries
> when running in a cluster? Can I use jobmanager.rpc.port: 6123 which works
> standalone, do I have to add query.server.port and query.server.enable?
> Which port should I be using?
>
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
Ufuk - thank you for your help. My flink-conf.yaml is now configured cluster-wide (with restart) as:
 
# my flink-conf.yaml, on all flink nodes:
jobmanager.rpc.address: x.x.x.x
jobmanager.rpc.port: 6123
query.server.port: 6123
query.server.enable: true
 
When I try to issue my query now with the above settings (this query worked on a single flink node running by itself):
 
 
2017/04/24 07:08:26.940 ERROR [OneForOneStrategy] Error while decoding incoming Akka PDU of length: 3623
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 3623
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
        at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167)
        at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:621)
        at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:372)
        at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:367)
 
 
My client code. I verified the job id exists, and is running, the stateName exists, and is populated. Again, this code runs on a single standalone flink node
 
// server = job manager ip, which I can route to, and responds on port 6123
// port = 6123
    private static QueryableStateClient newQueryableStateClient(String server, int port) {
        Configuration configFlink = new Configuration();
        configFlink.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, server);
        configFlink.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
        try {
            client = new QueryableStateClient(configFlink);
            return client;
        }
        catch (Exception e) {
            logger.error("Error configuring QueryableStateGateway: "+e);
            return null;
        }
    }
 
public HashMap<Long, java.util.ArrayList<String>> executeQuery(Tuple2<String, String> key, String flinkJobID, String stateName) {
        JobID jobId = JobID.fromHexString(flinkJobID);
        byte[] serializedKey = getSeralizedKey(key);
        Future<byte[]> future = client.getKvState(jobId, stateName, key.hashCode(), serializedKey);
        try {
            byte[] serializedResult = Await.result(future, new FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
            HashMap<Long, java.util.ArrayList<String>> results = deserializeResponseGlobalCoverage(serializedResult);
            return results;
        }
        catch (Exception e) {
            logger.error("Queryable State Error: "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
            return null;
        }
    }
 
 
Thank you for the help!
 
 
 
24.04.2017, 07:55, "Ufuk Celebi" <[hidden email]>:

You should be able to use queryable state w/o any changes to the
default config. The `query.server.port` option defines the port of the
queryable state server that serves the state on the task managers and
it is enabled by default.

The important thing is to configure the client to discover the
JobManager and everything else should work out of the box. Can you
please

1) Use the default config and verify in the JobManager logs that the
JobManager listens on port 6123 (the default JM port) and that all
expected TaskManagers connect to it?

2) Share the code for how you configure the QueryableStateClient?

– Ufuk

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
Ok...more information.
 
1. Built a fresh cluster from the ground up. Started testing queryable state at each step.
2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected.
3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null.
4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external.
5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI.
6. My flink-conf.yaml:
 
jobmanager.rpc.address: flink01
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
 
taskmanager.heap.mb: 512
taskmanager.data.port: 6121
taskmanager.numberOfTaskSlots: 1
taskmanager.memory.preallocate: false
 
parallelism.default: 1
blob.server.port: 6130
 
jobmanager.web.port: 8081
query.server.enable: true
 
7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs.
 
Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers?
 
THANKS!
 
 
25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:

It's strange that the rpc port is set to 30000 when you use a
standalone cluster and configure 6123 as the port. I'm pretty sure
that the config has not been updated.

But everything should work as you say when you point it to the correct
jobmanager address and port. Could you please post the complete
stacktrace you get instead of the message you log?


On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
<[hidden email]> wrote:

 
 More information:

 0. I did remove the query.server.port and query.server.enabled from all
 flink-conf.yaml files, and restarted the cluster.

 1. The Akka error doesn't seem to have anything to do with the problem. If I
 point my query client at an IP address with no Flink server running at all,
 I get that error. It seems to be a (side effect?) timeout for "no flink
 service is listening on the port you told me to check"

 2. I did notice (using the Flink Web UI) even with the config file changes
 in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port
 (6123), on my cluster, jobmanager.rpc.port is set to 30000.

 3. If I do send a query using the jobmanager.rpc.address and the
 jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from
 the client to Flink will be initiated and completed. When I try to execute
 the query (code below), it will fail, and will get trapped. When I look at
 the error message returned (e.getMessage() below), it is simply 'null':

 try {
       byte[] serializedResult = Await.result(future, new
 FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
       // de-serialize, commented out for testing
       return null;
         }
         catch (Exception e) {
             logger.error("Queryable State Error:
 "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
             return null;
         }

 Should I be sending the query to the job manager on the the job manager's
 rpc port when flink is clustered?

 ALSO - I do know the state name I am trying to query exists, is populated,
 and the job id exists. I also know the task managers are communicating with
 the job managers (task managers data port: 6121) and processed the data that
 resulted in the state variable I am trying to query being populated. All
 this was logged.


 24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:

 Hey Chet! You can remove

 query.server.port: 6123
 query.server.enable: true

 That shouldn't cause the Exception we see here though. I'm actually
 not sure what is causing the PduCodecException. Could this be related
 to different Akka versions being used in Flink and your client code?
 [1] Is it possible for you to check this?

 – Ufuk

 [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
After setting the logging to DEBUG on the job manager, I learned four things:
 
(On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana)
 
1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job <job id> under name <statename>"}
 
2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job <job id> with registration name <statename>". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages.
 
3. I saw no other messages in the job manager log that seemed relevant.
 
4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null.
 
Thanks!
 
 
 
 
 
26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:

Thanks! Your config looks good to me.

Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?

log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

Then we can check whether the JobManager logs the registration of the
state instance with the respective name in the case of parallelism >
1?

Expected output is something like this: "Key value state registered
for job ${msg.getJobId} under name ${msg.getRegistrationName}."

– Ufuk

On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
<[hidden email]> wrote:

 Ok...more information.

 1. Built a fresh cluster from the ground up. Started testing queryable state
 at each step.
 2. When running under any configuration of task managers and job managers
 were parallelism = 1, the queries execute as expected.
 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
 manager) feeding off a kafka topic partitioned three ways, queries will
 always fail, returning error
 (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
 error message of null.
 4. I do know my state is as expected on the cluster. Liberal use of trace
 prints show my state managed on the jobs is as I expect. However, I cannot
 query them external.
 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
 is configured by using the job manager UI.
 6. My flink-conf.yaml:

 jobmanager.rpc.address: flink01
 jobmanager.rpc.port: 6123
 jobmanager.heap.mb: 256

 taskmanager.heap.mb: 512
 taskmanager.data.port: 6121
 taskmanager.numberOfTaskSlots: 1
 taskmanager.memory.preallocate: false

 parallelism.default: 1
 blob.server.port: 6130

 jobmanager.web.port: 8081
 query.server.enable: true

 7. I do know my job is indeed running in parallel, from trace prints going
 to the task manager logs.

 Do I need a backend configured when running in parallel for the queryable
 state? Do I need a shared temp directory on the task managers?

 THANKS!


 25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:

 It's strange that the rpc port is set to 30000 when you use a
 standalone cluster and configure 6123 as the port. I'm pretty sure
 that the config has not been updated.

 But everything should work as you say when you point it to the correct
 jobmanager address and port. Could you please post the complete
 stacktrace you get instead of the message you log?


 On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
 <[hidden email]> wrote:



  More information:

  0. I did remove the query.server.port and query.server.enabled from all
  flink-conf.yaml files, and restarted the cluster.

  1. The Akka error doesn't seem to have anything to do with the problem. If
 I
  point my query client at an IP address with no Flink server running at all,
  I get that error. It seems to be a (side effect?) timeout for "no flink
  service is listening on the port you told me to check"

  2. I did notice (using the Flink Web UI) even with the config file changes
  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port
  (6123), on my cluster, jobmanager.rpc.port is set to 30000.

  3. If I do send a query using the jobmanager.rpc.address and the
  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
 from
  the client to Flink will be initiated and completed. When I try to execute
  the query (code below), it will fail, and will get trapped. When I look at
  the error message returned (e.getMessage() below), it is simply 'null':

  try {
        byte[] serializedResult = Await.result(future, new
  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
        // de-serialize, commented out for testing
        return null;
          }
          catch (Exception e) {
              logger.error("Queryable State Error:
  "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
              return null;
          }

  Should I be sending the query to the job manager on the the job manager's
  rpc port when flink is clustered?

  ALSO - I do know the state name I am trying to query exists, is populated,
  and the job id exists. I also know the task managers are communicating with
  the job managers (task managers data port: 6121) and processed the data
 that
  resulted in the state variable I am trying to query being populated. All
  this was logged.


  24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:

  Hey Chet! You can remove

  query.server.port: 6123
  query.server.enable: true

  That shouldn't cause the Exception we see here though. I'm actually
  not sure what is causing the PduCodecException. Could this be related
  to different Akka versions being used in Flink and your client code?
  [1] Is it possible for you to check this?

  – Ufuk

  [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
 
Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state...
 
 
26.04.2017, 12:11, "Chet Masterson" <[hidden email]>:
After setting the logging to DEBUG on the job manager, I learned four things:
 
(On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana)
 
1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job <job id> under name <statename>"}
 
2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job <job id> with registration name <statename>". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages.
 
3. I saw no other messages in the job manager log that seemed relevant.
 
4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null.
 
Thanks!
 
 
 
 
 
26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:

Thanks! Your config looks good to me.

Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?

log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

Then we can check whether the JobManager logs the registration of the
state instance with the respective name in the case of parallelism >
1?

Expected output is something like this: "Key value state registered
for job ${msg.getJobId} under name ${msg.getRegistrationName}."

– Ufuk

On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
<[hidden email]> wrote:

 Ok...more information.

 1. Built a fresh cluster from the ground up. Started testing queryable state
 at each step.
 2. When running under any configuration of task managers and job managers
 were parallelism = 1, the queries execute as expected.
 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
 manager) feeding off a kafka topic partitioned three ways, queries will
 always fail, returning error
 (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
 error message of null.
 4. I do know my state is as expected on the cluster. Liberal use of trace
 prints show my state managed on the jobs is as I expect. However, I cannot
 query them external.
 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
 is configured by using the job manager UI.
 6. My flink-conf.yaml:

 jobmanager.rpc.address: flink01
 jobmanager.rpc.port: 6123
 jobmanager.heap.mb: 256

 taskmanager.heap.mb: 512
 taskmanager.data.port: 6121
 taskmanager.numberOfTaskSlots: 1
 taskmanager.memory.preallocate: false

 parallelism.default: 1
 blob.server.port: 6130

 jobmanager.web.port: 8081
 query.server.enable: true

 7. I do know my job is indeed running in parallel, from trace prints going
 to the task manager logs.

 Do I need a backend configured when running in parallel for the queryable
 state? Do I need a shared temp directory on the task managers?

 THANKS!


 25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:

 It's strange that the rpc port is set to 30000 when you use a
 standalone cluster and configure 6123 as the port. I'm pretty sure
 that the config has not been updated.

 But everything should work as you say when you point it to the correct
 jobmanager address and port. Could you please post the complete
 stacktrace you get instead of the message you log?


 On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
 <[hidden email]> wrote:



  More information:

  0. I did remove the query.server.port and query.server.enabled from all
  flink-conf.yaml files, and restarted the cluster.

  1. The Akka error doesn't seem to have anything to do with the problem. If
 I
  point my query client at an IP address with no Flink server running at all,
  I get that error. It seems to be a (side effect?) timeout for "no flink
  service is listening on the port you told me to check"

  2. I did notice (using the Flink Web UI) even with the config file changes
  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port
  (6123), on my cluster, jobmanager.rpc.port is set to 30000.

  3. If I do send a query using the jobmanager.rpc.address and the
  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
 from
  the client to Flink will be initiated and completed. When I try to execute
  the query (code below), it will fail, and will get trapped. When I look at
  the error message returned (e.getMessage() below), it is simply 'null':

  try {
        byte[] serializedResult = Await.result(future, new
  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
        // de-serialize, commented out for testing
        return null;
          }
          catch (Exception e) {
              logger.error("Queryable State Error:
  "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
              return null;
          }

  Should I be sending the query to the job manager on the the job manager's
  rpc port when flink is clustered?

  ALSO - I do know the state name I am trying to query exists, is populated,
  and the job id exists. I also know the task managers are communicating with
  the job managers (task managers data port: 6121) and processed the data
 that
  resulted in the state variable I am trying to query being populated. All
  this was logged.


  24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:

  Hey Chet! You can remove

  query.server.port: 6123
  query.server.enable: true

  That shouldn't cause the Exception we see here though. I'm actually
  not sure what is causing the PduCodecException. Could this be related
  to different Akka versions being used in Flink and your client code?
  [1] Is it possible for you to check this?

  – Ufuk

  [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
Can do. Any advice on where the trace prints should go in the task manager source code?
 
BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa?
 
Thanks!
 
 
02.05.2017, 06:03, "Ufuk Celebi" <[hidden email]>:

Hey Chet! I'm wondering why you are only seeing 2 registration
messages for 3 task managers. Unfortunately, there is no log message
at the task managers when they send out the notification. Is it
possible for you to run a remote debugger with the task managers or
build a custom Flink version with the appropriate log messages on the
task manager side?
– Ufuk


On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
<[hidden email]> wrote:

 
 Any insight here? I've got a situation where a key value state on a task
 manager is being registered with the job manager, but when I try to query
 it, the job manager responds it doesn't know the location of the key value
 state...


 26.04.2017, 12:11, "Chet Masterson" <[hidden email]>:

 After setting the logging to DEBUG on the job manager, I learned four
 things:

 (On the message formatting below, I have the Flink logs formatted into JSON
 so I can import them into Kibana)

 1. The appropriate key value state is registered in both parallelism = 1 and
 parallelism = 3 environments. In parallelism = 1, I saw one registration
 message in the log, in the parallelism = 3, I saw two registration messages:
 {"level":"DEBUG","time":"2017-04-26
 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
 "msg":"Key value state registered for job <job id> under name <statename>"}

 2. When I issued the query in both parallelism = 1 and parallelism = 3
 environments, I saw "Lookup key-value state for job <job id> with
 registration name <statename>". In parallelism = 1, I saw 1 log message, in
 parallelism = 3, I saw two identical messages.

 3. I saw no other messages in the job manager log that seemed relevant.

 4. When issuing the query in parallelism = 3, I continued to get the error:
 org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message
 of null.

 Thanks!





 26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:

 Thanks! Your config looks good to me.

 Could you please set the log level org.apache.flink.runtime.jobmanager to
 DEBUG?

 log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

 Then we can check whether the JobManager logs the registration of the
 state instance with the respective name in the case of parallelism >
 1?

 Expected output is something like this: "Key value state registered
 for job ${msg.getJobId} under name ${msg.getRegistrationName}."

 – Ufuk

 On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
 <[hidden email]> wrote:

  Ok...more information.

  1. Built a fresh cluster from the ground up. Started testing queryable
 state
  at each step.
  2. When running under any configuration of task managers and job managers
  were parallelism = 1, the queries execute as expected.
  3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
  manager) feeding off a kafka topic partitioned three ways, queries will
  always fail, returning error
  (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
  error message of null.
  4. I do know my state is as expected on the cluster. Liberal use of trace
  prints show my state managed on the jobs is as I expect. However, I cannot
  query them external.
  5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
  is configured by using the job manager UI.
  6. My flink-conf.yaml:

  jobmanager.rpc.address: flink01
  jobmanager.rpc.port: 6123
  jobmanager.heap.mb: 256

  taskmanager.heap.mb: 512
  taskmanager.data.port: 6121
  taskmanager.numberOfTaskSlots: 1
  taskmanager.memory.preallocate: false

  parallelism.default: 1
  blob.server.port: 6130

  jobmanager.web.port: 8081
  query.server.enable: true

  7. I do know my job is indeed running in parallel, from trace prints going
  to the task manager logs.

  Do I need a backend configured when running in parallel for the queryable
  state? Do I need a shared temp directory on the task managers?

  THANKS!


  25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:

  It's strange that the rpc port is set to 30000 when you use a
  standalone cluster and configure 6123 as the port. I'm pretty sure
  that the config has not been updated.

  But everything should work as you say when you point it to the correct
  jobmanager address and port. Could you please post the complete
  stacktrace you get instead of the message you log?


  On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
  <[hidden email]> wrote:



   More information:

   0. I did remove the query.server.port and query.server.enabled from all
   flink-conf.yaml files, and restarted the cluster.

   1. The Akka error doesn't seem to have anything to do with the problem. If
  I
   point my query client at an IP address with no Flink server running at
 all,
   I get that error. It seems to be a (side effect?) timeout for "no flink
   service is listening on the port you told me to check"

   2. I did notice (using the Flink Web UI) even with the config file changes
   in 0, and no changes to the default flink-conf.yaml the
 jobmanager.rpc.port
   (6123), on my cluster, jobmanager.rpc.port is set to 30000.

   3. If I do send a query using the jobmanager.rpc.address and the
   jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
  from
   the client to Flink will be initiated and completed. When I try to execute
   the query (code below), it will fail, and will get trapped. When I look at
   the error message returned (e.getMessage() below), it is simply 'null':

   try {
         byte[] serializedResult = Await.result(future, new
   FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
         // de-serialize, commented out for testing
         return null;
           }
           catch (Exception e) {
               logger.error("Queryable State Error:
   "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
               return null;
           }

   Should I be sending the query to the job manager on the the job manager's
   rpc port when flink is clustered?

   ALSO - I do know the state name I am trying to query exists, is populated,
   and the job id exists. I also know the task managers are communicating
 with
   the job managers (task managers data port: 6121) and processed the data
  that
   resulted in the state variable I am trying to query being populated. All
   this was logged.


   24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:

   Hey Chet! You can remove

   query.server.port: 6123
   query.server.enable: true

   That shouldn't cause the Exception we see here though. I'm actually
   not sure what is causing the PduCodecException. Could this be related
   to different Akka versions being used in Flink and your client code?
   [1] Is it possible for you to check this?

   – Ufuk

   [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Ufuk Celebi
Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
<[hidden email]> wrote:

> Can do. Any advice on where the trace prints should go in the task manager
> source code?
>
> BTW - How do I know I have a correctly configured cluster? Is there a set of
> messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
> Thanks!
>
>
> 02.05.2017, 06:03, "Ufuk Celebi" <[hidden email]>:
>
> Hey Chet! I'm wondering why you are only seeing 2 registration
> messages for 3 task managers. Unfortunately, there is no log message
> at the task managers when they send out the notification. Is it
> possible for you to run a remote debugger with the task managers or
> build a custom Flink version with the appropriate log messages on the
> task manager side?
> – Ufuk
>
>
> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
> <[hidden email]> wrote:
>
>
>
>  Any insight here? I've got a situation where a key value state on a task
>  manager is being registered with the job manager, but when I try to query
>  it, the job manager responds it doesn't know the location of the key value
>  state...
>
>
>  26.04.2017, 12:11, "Chet Masterson" <[hidden email]>:
>
>  After setting the logging to DEBUG on the job manager, I learned four
>  things:
>
>  (On the message formatting below, I have the Flink logs formatted into JSON
>  so I can import them into Kibana)
>
>  1. The appropriate key value state is registered in both parallelism = 1
> and
>  parallelism = 3 environments. In parallelism = 1, I saw one registration
>  message in the log, in the parallelism = 3, I saw two registration
> messages:
>  {"level":"DEBUG","time":"2017-04-26
>
> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
>  "msg":"Key value state registered for job <job id> under name <statename>"}
>
>  2. When I issued the query in both parallelism = 1 and parallelism = 3
>  environments, I saw "Lookup key-value state for job <job id> with
>  registration name <statename>". In parallelism = 1, I saw 1 log message, in
>  parallelism = 3, I saw two identical messages.
>
>  3. I saw no other messages in the job manager log that seemed relevant.
>
>  4. When issuing the query in parallelism = 3, I continued to get the error:
>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>  of null.
>
>  Thanks!
>
>
>
>
>
>  26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:
>
>  Thanks! Your config looks good to me.
>
>  Could you please set the log level org.apache.flink.runtime.jobmanager to
>  DEBUG?
>
>  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>  Then we can check whether the JobManager logs the registration of the
>  state instance with the respective name in the case of parallelism >
>  1?
>
>  Expected output is something like this: "Key value state registered
>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>  – Ufuk
>
>  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>  <[hidden email]> wrote:
>
>   Ok...more information.
>
>   1. Built a fresh cluster from the ground up. Started testing queryable
>  state
>   at each step.
>   2. When running under any configuration of task managers and job managers
>   were parallelism = 1, the queries execute as expected.
>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
>   manager) feeding off a kafka topic partitioned three ways, queries will
>   always fail, returning error
>   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>   error message of null.
>   4. I do know my state is as expected on the cluster. Liberal use of trace
>   prints show my state managed on the jobs is as I expect. However, I cannot
>   query them external.
>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
>   is configured by using the job manager UI.
>   6. My flink-conf.yaml:
>
>   jobmanager.rpc.address: flink01
>   jobmanager.rpc.port: 6123
>   jobmanager.heap.mb: 256
>
>   taskmanager.heap.mb: 512
>   taskmanager.data.port: 6121
>   taskmanager.numberOfTaskSlots: 1
>   taskmanager.memory.preallocate: false
>
>   parallelism.default: 1
>   blob.server.port: 6130
>
>   jobmanager.web.port: 8081
>   query.server.enable: true
>
>   7. I do know my job is indeed running in parallel, from trace prints going
>   to the task manager logs.
>
>   Do I need a backend configured when running in parallel for the queryable
>   state? Do I need a shared temp directory on the task managers?
>
>   THANKS!
>
>
>   25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:
>
>   It's strange that the rpc port is set to 30000 when you use a
>   standalone cluster and configure 6123 as the port. I'm pretty sure
>   that the config has not been updated.
>
>   But everything should work as you say when you point it to the correct
>   jobmanager address and port. Could you please post the complete
>   stacktrace you get instead of the message you log?
>
>
>   On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
>   <[hidden email]> wrote:
>
>
>
>    More information:
>
>    0. I did remove the query.server.port and query.server.enabled from all
>    flink-conf.yaml files, and restarted the cluster.
>
>    1. The Akka error doesn't seem to have anything to do with the problem.
> If
>   I
>    point my query client at an IP address with no Flink server running at
>  all,
>    I get that error. It seems to be a (side effect?) timeout for "no flink
>    service is listening on the port you told me to check"
>
>    2. I did notice (using the Flink Web UI) even with the config file
> changes
>    in 0, and no changes to the default flink-conf.yaml the
>  jobmanager.rpc.port
>    (6123), on my cluster, jobmanager.rpc.port is set to 30000.
>
>    3. If I do send a query using the jobmanager.rpc.address and the
>    jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
>   from
>    the client to Flink will be initiated and completed. When I try to
> execute
>    the query (code below), it will fail, and will get trapped. When I look
> at
>    the error message returned (e.getMessage() below), it is simply 'null':
>
>    try {
>          byte[] serializedResult = Await.result(future, new
>    FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
>          // de-serialize, commented out for testing
>          return null;
>            }
>            catch (Exception e) {
>                logger.error("Queryable State Error:
>    "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
>                return null;
>            }
>
>    Should I be sending the query to the job manager on the the job manager's
>    rpc port when flink is clustered?
>
>    ALSO - I do know the state name I am trying to query exists, is
> populated,
>    and the job id exists. I also know the task managers are communicating
>  with
>    the job managers (task managers data port: 6121) and processed the data
>   that
>    resulted in the state variable I am trying to query being populated. All
>    this was logged.
>
>
>    24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:
>
>    Hey Chet! You can remove
>
>    query.server.port: 6123
>    query.server.enable: true
>
>    That shouldn't cause the Exception we see here though. I'm actually
>    not sure what is causing the PduCodecException. Could this be related
>    to different Akka versions being used in Flink and your client code?
>    [1] Is it possible for you to check this?
>
>    – Ufuk
>
>    [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Chet Masterson
I found the issue. When parallelism = 3, my test data set was skewed such that data was only going to two of the three task managers (kafka partition = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across all three task managers, queryable state started working as expected. That is why only two KVStates were registered with the job manager, instead of three.
 
my FINAL :-) question....should I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event only N-1 task managers have data in a parallelism of N situation?
 
Thanks for all the help!
 
 
04.05.2017, 11:24, "Ufuk Celebi" <[hidden email]>:

Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
<[hidden email]> wrote:

 Can do. Any advice on where the trace prints should go in the task manager
 source code?

 BTW - How do I know I have a correctly configured cluster? Is there a set of
 messages in the job / task manager logs that indicate all required
 connectivity is present? I know I use the UI to make sure all the task
 managers are present, and that the job is running on all of them, but is
 there some verbiage in the logs that indicates the job manager can talk to
 all the task managers, and vice versa?

 Thanks!


 02.05.2017, 06:03, "Ufuk Celebi" <[hidden email]>:

 Hey Chet! I'm wondering why you are only seeing 2 registration
 messages for 3 task managers. Unfortunately, there is no log message
 at the task managers when they send out the notification. Is it
 possible for you to run a remote debugger with the task managers or
 build a custom Flink version with the appropriate log messages on the
 task manager side?
 – Ufuk


 On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
 <[hidden email]> wrote:



  Any insight here? I've got a situation where a key value state on a task
  manager is being registered with the job manager, but when I try to query
  it, the job manager responds it doesn't know the location of the key value
  state...


  26.04.2017, 12:11, "Chet Masterson" <[hidden email]>:

  After setting the logging to DEBUG on the job manager, I learned four
  things:

  (On the message formatting below, I have the Flink logs formatted into JSON
  so I can import them into Kibana)

  1. The appropriate key value state is registered in both parallelism = 1
 and
  parallelism = 3 environments. In parallelism = 1, I saw one registration
  message in the log, in the parallelism = 3, I saw two registration
 messages:
  {"level":"DEBUG","time":"2017-04-26

 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
  "msg":"Key value state registered for job <job id> under name <statename>"}

  2. When I issued the query in both parallelism = 1 and parallelism = 3
  environments, I saw "Lookup key-value state for job <job id> with
  registration name <statename>". In parallelism = 1, I saw 1 log message, in
  parallelism = 3, I saw two identical messages.

  3. I saw no other messages in the job manager log that seemed relevant.

  4. When issuing the query in parallelism = 3, I continued to get the error:
  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
 message
  of null.

  Thanks!





  26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:

  Thanks! Your config looks good to me.

  Could you please set the log level org.apache.flink.runtime.jobmanager to
  DEBUG?

  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

  Then we can check whether the JobManager logs the registration of the
  state instance with the respective name in the case of parallelism >
  1?

  Expected output is something like this: "Key value state registered
  for job ${msg.getJobId} under name ${msg.getRegistrationName}."

  – Ufuk

  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
  <[hidden email]> wrote:

   Ok...more information.

   1. Built a fresh cluster from the ground up. Started testing queryable
  state
   at each step.
   2. When running under any configuration of task managers and job managers
   were parallelism = 1, the queries execute as expected.
   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
   manager) feeding off a kafka topic partitioned three ways, queries will
   always fail, returning error
   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
   error message of null.
   4. I do know my state is as expected on the cluster. Liberal use of trace
   prints show my state managed on the jobs is as I expect. However, I cannot
   query them external.
   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
   is configured by using the job manager UI.
   6. My flink-conf.yaml:

   jobmanager.rpc.address: flink01
   jobmanager.rpc.port: 6123
   jobmanager.heap.mb: 256

   taskmanager.heap.mb: 512
   taskmanager.data.port: 6121
   taskmanager.numberOfTaskSlots: 1
   taskmanager.memory.preallocate: false

   parallelism.default: 1
   blob.server.port: 6130

   jobmanager.web.port: 8081
   query.server.enable: true

   7. I do know my job is indeed running in parallel, from trace prints going
   to the task manager logs.

   Do I need a backend configured when running in parallel for the queryable
   state? Do I need a shared temp directory on the task managers?

   THANKS!


   25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:

   It's strange that the rpc port is set to 30000 when you use a
   standalone cluster and configure 6123 as the port. I'm pretty sure
   that the config has not been updated.

   But everything should work as you say when you point it to the correct
   jobmanager address and port. Could you please post the complete
   stacktrace you get instead of the message you log?


   On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
   <[hidden email]> wrote:



    More information:

    0. I did remove the query.server.port and query.server.enabled from all
    flink-conf.yaml files, and restarted the cluster.

    1. The Akka error doesn't seem to have anything to do with the problem.
 If
   I
    point my query client at an IP address with no Flink server running at
  all,
    I get that error. It seems to be a (side effect?) timeout for "no flink
    service is listening on the port you told me to check"

    2. I did notice (using the Flink Web UI) even with the config file
 changes
    in 0, and no changes to the default flink-conf.yaml the
  jobmanager.rpc.port
    (6123), on my cluster, jobmanager.rpc.port is set to 30000.

    3. If I do send a query using the jobmanager.rpc.address and the
    jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
   from
    the client to Flink will be initiated and completed. When I try to
 execute
    the query (code below), it will fail, and will get trapped. When I look
 at
    the error message returned (e.getMessage() below), it is simply 'null':

    try {
          byte[] serializedResult = Await.result(future, new
    FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
          // de-serialize, commented out for testing
          return null;
            }
            catch (Exception e) {
                logger.error("Queryable State Error:
    "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
                return null;
            }

    Should I be sending the query to the job manager on the the job manager's
    rpc port when flink is clustered?

    ALSO - I do know the state name I am trying to query exists, is
 populated,
    and the job id exists. I also know the task managers are communicating
  with
    the job managers (task managers data port: 6121) and processed the data
   that
    resulted in the state variable I am trying to query being populated. All
    this was logged.


    24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:

    Hey Chet! You can remove

    query.server.port: 6123
    query.server.enable: true

    That shouldn't cause the Exception we see here though. I'm actually
    not sure what is causing the PduCodecException. Could this be related
    to different Akka versions being used in Flink and your client code?
    [1] Is it possible for you to check this?

    – Ufuk

    [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Queryable State

Nico Kruber
Hi Chet,
you should not see a
org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation when querying an
existing(!) key.
However, if you query a key the non-registered TaskManager is responsible for,
I suppose this is the exception you will get. Unfortunately, the queryable
state API still seems to be rough around the edges.

I suspect that the TaskManagers register their queryable state only after
receiving data(?) and this causes the UnknownKvStateKeyGroupLocation instead
of a UnknownKeyOrNamespace.


Nico

On Thursday, 4 May 2017 20:05:29 CEST Chet Masterson wrote:

> I found the issue. When parallelism = 3, my test data set was skewed such
> that data was only going to two of the three task managers (kafka partition
> = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a
> test data set with enough keys that spread across all three task managers,
> queryable state started working as expected. That is why only two KVStates
> were registered with the job manager, instead of three.
> my FINAL :-) question....should I be getting
> org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event
> only N-1 task managers have data in a parallelism of N situation?
> Thanks for all the help!
>  
>  
> 04.05.2017, 11:24, "Ufuk Celebi" <[hidden email]>:
> Could you try KvStateRegistry#registerKvState please?
>
> In the JM logs you should see something about the number of connected
> task managers and in the task manager logs that each one connects to a
> JM.
>
> – Ufuk
>
>
> On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
> <[hidden email]> wrote:
>
>  Can do. Any advice on where the trace prints should go in the task manager
>  source code?
>
>  BTW - How do I know I have a correctly configured cluster? Is there a set
> of messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
>  Thanks!
>
>
>  02.05.2017, 06:03, "Ufuk Celebi" <[hidden email]>:
>
>  Hey Chet! I'm wondering why you are only seeing 2 registration
>  messages for 3 task managers. Unfortunately, there is no log message
>  at the task managers when they send out the notification. Is it
>  possible for you to run a remote debugger with the task managers or
>  build a custom Flink version with the appropriate log messages on the
>  task manager side?
>  – Ufuk
>
>
>  On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>  <[hidden email]> wrote:
>
>
>
>   Any insight here? I've got a situation where a key value state on a task
>   manager is being registered with the job manager, but when I try to query
>   it, the job manager responds it doesn't know the location of the key value
> state...
>
>
>   26.04.2017, 12:11, "Chet Masterson" <[hidden email]>:
>
>   After setting the logging to DEBUG on the job manager, I learned four
>   things:
>
>   (On the message formatting below, I have the Flink logs formatted into
> JSON so I can import them into Kibana)
>
>   1. The appropriate key value state is registered in both parallelism = 1
>  and
>   parallelism = 3 environments. In parallelism = 1, I saw one registration
>   message in the log, in the parallelism = 3, I saw two registration
>  messages:
>   {"level":"DEBUG","time":"2017-04-26
>
>  15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc
> ":"", "msg":"Key value state registered for job <job id> under name
> <statename>"}
>
>   2. When I issued the query in both parallelism = 1 and parallelism = 3
>   environments, I saw "Lookup key-value state for job <job id> with
>   registration name <statename>". In parallelism = 1, I saw 1 log message,
> in parallelism = 3, I saw two identical messages.
>
>   3. I saw no other messages in the job manager log that seemed relevant.
>
>   4. When issuing the query in parallelism = 3, I continued to get the
> error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>   of null.
>
>   Thanks!
>
>
>
>
>
>   26.04.2017, 09:52, "Ufuk Celebi" <[hidden email]>:
>
>   Thanks! Your config looks good to me.
>
>   Could you please set the log level org.apache.flink.runtime.jobmanager to
>   DEBUG?
>
>   log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>   Then we can check whether the JobManager logs the registration of the
>   state instance with the respective name in the case of parallelism >
>   1?
>
>   Expected output is something like this: "Key value state registered
>   for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>   – Ufuk
>
>   On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>   <[hidden email]> wrote:
>
>    Ok...more information.
>
>    1. Built a fresh cluster from the ground up. Started testing queryable
>   state
>    at each step.
>    2. When running under any configuration of task managers and job managers
> were parallelism = 1, the queries execute as expected.
>    3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
> manager) feeding off a kafka topic partitioned three ways, queries will
> always fail, returning error
>    (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>    error message of null.
>    4. I do know my state is as expected on the cluster. Liberal use of trace
> prints show my state managed on the jobs is as I expect. However, I cannot
> query them external.
>    5. I am sending the query to jobmanager.rpc.port = 6123, which I
> confirmed is configured by using the job manager UI.
>    6. My flink-conf.yaml:
>
>    jobmanager.rpc.address: flink01
>    jobmanager.rpc.port: 6123
>    jobmanager.heap.mb: 256
>
>    taskmanager.heap.mb: 512
>    taskmanager.data.port: 6121
>    taskmanager.numberOfTaskSlots: 1
>    taskmanager.memory.preallocate: false
>
>    parallelism.default: 1
>    blob.server.port: 6130
>
>    jobmanager.web.port: 8081
>    query.server.enable: true
>
>    7. I do know my job is indeed running in parallel, from trace prints
> going to the task manager logs.
>
>    Do I need a backend configured when running in parallel for the queryable
> state? Do I need a shared temp directory on the task managers?
>
>    THANKS!
>
>
>    25.04.2017, 04:24, "Ufuk Celebi" <[hidden email]>:
>
>    It's strange that the rpc port is set to 30000 when you use a
>    standalone cluster and configure 6123 as the port. I'm pretty sure
>    that the config has not been updated.
>
>    But everything should work as you say when you point it to the correct
>    jobmanager address and port. Could you please post the complete
>    stacktrace you get instead of the message you log?
>
>
>    On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
>    <[hidden email]> wrote:
>
>
>
>     More information:
>
>     0. I did remove the query.server.port and query.server.enabled from all
>     flink-conf.yaml files, and restarted the cluster.
>
>     1. The Akka error doesn't seem to have anything to do with the problem.
>  If
>    I
>     point my query client at an IP address with no Flink server running at
>   all,
>     I get that error. It seems to be a (side effect?) timeout for "no flink
>     service is listening on the port you told me to check"
>
>     2. I did notice (using the Flink Web UI) even with the config file
>  changes
>     in 0, and no changes to the default flink-conf.yaml the
>   jobmanager.rpc.port
>     (6123), on my cluster, jobmanager.rpc.port is set to 30000.
>
>     3. If I do send a query using the jobmanager.rpc.address and the
>     jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
>    from
>     the client to Flink will be initiated and completed. When I try to
>  execute
>     the query (code below), it will fail, and will get trapped. When I look
>  at
>     the error message returned (e.getMessage() below), it is simply 'null':
>
>     try {
>           byte[] serializedResult = Await.result(future, new
>     FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
>           // de-serialize, commented out for testing
>           return null;
>             }
>             catch (Exception e) {
>                 logger.error("Queryable State Error:
>     "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
>                 return null;
>             }
>
>     Should I be sending the query to the job manager on the the job
> manager's rpc port when flink is clustered?
>
>     ALSO - I do know the state name I am trying to query exists, is
>  populated,
>     and the job id exists. I also know the task managers are communicating
>   with
>     the job managers (task managers data port: 6121) and processed the data
>    that
>     resulted in the state variable I am trying to query being populated. All
> this was logged.
>
>
>     24.04.2017, 10:34, "Ufuk Celebi" <[hidden email]>:
>
>     Hey Chet! You can remove
>
>     query.server.port: 6123
>     query.server.enable: true
>
>     That shouldn't cause the Exception we see here though. I'm actually
>     not sure what is causing the PduCodecException. Could this be related
>     to different Akka versions being used in Flink and your client code?
>     [1] Is it possible for you to check this?
>
>     – Ufuk
>
>     [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0


signature.asc (201 bytes) Download Attachment
Loading...