Add custom configuration files to TMs classpath on YARN

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi all,

I run my flink job on yarn cluster and need to supply job configuration parameters via configuration file alongside with the job jar. (configuration file can't be packaged into jobs jar file).
I tried to put the configuration file into the folder that is passed via --yarnship option to the flink run command, then this file is copied to the yarn cluster and added to JVM class path like 'path/application.conf' but is ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the ENV_FLINK_CLASSPATH is built and haven't found any option to to tell flink (YarnClusterDescriptor especially) to add my configuration file to the TM JVM classpath... Is there any way to do so? If not do you consider to have such an ability to add files? (like in spark I just can pass any files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin










smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi guys,

any news?


Kind Regards,
Mike Pryakhin


On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration parameters via configuration file alongside with the job jar. (configuration file can't be packaged into jobs jar file).
I tried to put the configuration file into the folder that is passed via --yarnship option to the flink run command, then this file is copied to the yarn cluster and added to JVM class path like 'path/application.conf' but is ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the ENV_FLINK_CLASSPATH is built and haven't found any option to to tell flink (YarnClusterDescriptor especially) to add my configuration file to the TM JVM classpath... Is there any way to do so? If not do you consider to have such an ability to add files? (like in spark I just can pass any files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin











smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Add custom configuration files to TMs classpath on YARN

Nico Kruber
A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
 * Registers a file at the distributed cache under the given name. The file will
be accessible
 * from any user-defined function in the (distributed) runtime under a local
path. Files
 * may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
 * The runtime will copy the files temporarily to a local cache, if needed.
 * <p>
 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
 * {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
 * {@link org.apache.flink.api.common.cache.DistributedCache} via
 * {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
 *
 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
 * @param name The name under which the file is registered.
 */
public void registerCachedFile(String filePath, String name){
        registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);

        ...

        env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

        ...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
        private static final long serialVersionUID = 1L;

        @Override
        public void open(Configuration conf) throws IOException {
                File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
        }
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:

> Hi guys,
>
> any news?
> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
> <https://issues.apache.org/jira/browse/FLINK-6949>.
>
>
> Kind Regards,
> Mike Pryakhin
>
> > On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:
> >
> > Hi all,
> >
> > I run my flink job on yarn cluster and need to supply job configuration
> > parameters via configuration file alongside with the job jar.
> > (configuration file can't be packaged into jobs jar file). I tried to put
> > the configuration file into the folder that is passed via --yarnship
> > option to the flink run command, then this file is copied to the yarn
> > cluster and added to JVM class path like 'path/application.conf' but is
> > ignored by TM JVM as it is neither jar(zip) file nor directory...
> >
> > A looked through the YarnClusterDescriptor class where the
> > ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
> > flink (YarnClusterDescriptor especially) to add my configuration file to
> > the TM JVM classpath... Is there any way to do so? If not do you consider
> > to have such an ability to add files? (like in spark I just can pass any
> > files via --files option)
> >
> > Thanks in advance.
> >
> > Kind Regards,
> > Mike Pryakhin


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment
Loading...