Issue with Execution WordCount.java from File Input

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Issue with Execution WordCount.java from File Input

chatterjeesubarna
This post has NOT been accepted by the mailing list yet.
Hello,

I have started using Flink quite recently. I have successfully deployed Flink on a 3-node cluster over HDFS. All the streaming examples like SocketWindowWordCount is also running fine. Now I wanted to execute WordCount.java by reading from a data file. My code is as follows:


WordCount.java

package org.apache.flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("/root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt");

        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);

        counts.print();
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            String[] tokens = value.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

This code runs perfectly on a local setup. But it fails on a clustered setup. The exception is as follows:

Root Exception:

java.io.IOException: Error opening the Input Split file:/root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt [0,42]: /root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt (No such file or directory)

        at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:144)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:133)
        at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:866)


CHAIN DataSource (at main(WordCount.java:59) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:63)) -> Combine(SUM(1), at main(WordCount.java:66) (1/1)

java.io.IOException: Error opening the Input Split file:/root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt [0,42]: /root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt (No such file or directory)
        at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
        at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:144)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: /root/flink-quickstart-java/src/main/java/org/apache/flink/data.txt (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:49)
        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:133)
        at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:866)



Please note that the file is present at the location /root/flink-quickstart-java/src/main/java/org/apache/flink/. I am not sure about the format of the input data. It would be great if anyone could help me with this or give an idea about how to fix this!

Thanking you in advance!

Subarna Chatterjee
Post-Doctoral Researcher
Inria, Rennes
Website: http://chatterjeesubarna.wix.com/subarna