java.lang.IllegalArgumentException: 错误的 FS: , 预期: hdfs://localhost:9000

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23779976/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 01:11:05  来源:igfitidea点击:

java.lang.IllegalArgumentException: Wrong FS: , expected: hdfs://localhost:9000

javahadoopmapreducedistributed-cache

提问by user3489834

I am trying to implement reduce side join , and using mapfile reader to look up distributed cache but it is not looking up the values when checked in stderr it showed the following error, lookupfile file is already present in hdfs , and seems to be loaded correctly into cache as seen in the stdout.

我正在尝试实现 reduce side join ,并使用 mapfile reader 来查找分布式缓存,但在 stderr 中检查时它没有查找值它显示以下错误,查找文件文件已存在于 hdfs 中,并且似乎已正确加载进入缓存,如标准输出中所示。

java.lang.IllegalArgumentException: Wrong FS: file:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data, expected: hdfs://localhost:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554) at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1479) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1474) at org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) at org.apache.hadoop.io.MapFile$Reader.open(MapFile.java:284) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:253) at mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59) at mr_poc.reducerrsj.setup(reducerrsj.java:42) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang.NullPointerException at mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83) at mr_poc.reducerrsj.reduce(reducerrsj.java:127) at mr_poc.reducerrsj.reduce(reducerrsj.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.

java.lang.IllegalArgumentException: 错误 FS: 文件:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status, localhost/input/delivery_status:/StatusCodes0dfs/delivery_status/-8118663285704962921_-1196516983_170706299在 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) 在 org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) 在 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus (DistributedFileSystem.java:554) at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1479) at org.apache .hadoop.io.SequenceFile$Reader.(SequenceFile.java:1474) 在 org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) 在 org.apache.hadoop.io.MapFile$Reader。打开(MapFile.java:284) 在 org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) 在 org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) 在 org.apache.hadoop.io .MapFile$Reader.(MapFile.java:253) at mr_poc.reducersj.initializeDepartmentsMap(reducersj.java:59) at mr_poc.reducersj.setup(reducersj.java:42) at org.apache.hadoop.mapreduce.Reducer.run (Reducer.java:174) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache。 hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop .security.UserGroupInformation.doAs(UserGroupInformation.java:1190) 在 org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang。NullPointerException at mr_poc.reducersj.buildOutputValue(reducersj.java:83) at mr_poc.reducersj.reduce(reducersj.java:127) at mr_poc.reducersj.reduce(reducersj.java:1) at org.ducersj.java:1) .run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org。 apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache .hadoop.security。ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security。ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security。

This is my driver code ,

这是我的驱动程序代码,

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);

    }


}

This is my reducer code

这是我的减速器代码

package mr_poc;

包mr_poc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());



        for ( Path eachPath : cacheFiles){

            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {

                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);

            }
            }
        }

    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {


    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {

            if (key.getsourceindex() == 2) {


            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);


            try {

            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }

            reduceValueBuilder.append(txtMapFileLookupValue.toString());


            } else if(key.getsourceindex() == 1) {

            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 



            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");

            return reduceValueBuilder;
    }

    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {


    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }

    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {

    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());

    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");

    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

this is my core-site-Xml

这是我的核心站点 Xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

Any help would be highly appreciated. Thanks in advance!!!

任何帮助将不胜感激。提前致谢!!!

回答by Jintao Guan

I think I have encountered a similar problem also. The key point of this problem is that you are going to operating a SequenceFile from DistributedCache which should be on your local file system. From your logs, there is a line

我想我也遇到过类似的问题。这个问题的关键是你要从 DistributedCache 操作一个 SequenceFile,它应该在你的本地文件系统上。从您的日志中,有一行

"org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)" 

If you can check out the source code of SequenceFile.Reader, you will find out that the log is caused by this code

如果可以查看SequenceFile.Reader的源码,会发现日志是由这段代码引起的

fs.getFileStatus(filename).getLen()    

and this "fs" here should be LocalFileSystem instead of DistributedFileSystem.

这里的“fs”应该是 LocalFileSystem 而不是 DistributedFileSystem。

My solution is that change

我的解决方案是改变

deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());

to

JobConf conf = context.getConfiguration();
String originalFS = conf.get("fs.default.name");   //backup original configuration
conf.set("fs.default.name", "file:///");           //change configuration to local file system
deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), conf);
conf.set("fs.default.name", originalFS);           //restore original configuration

After doing this, the SequenceFile.Reader object can access the cached files on local file system.

执行此操作后,SequenceFile.Reader 对象可以访问本地文件系统上的缓存文件。

I think this problem happens because the SequenceFile API has changed, and some APIs of SequenceFile.Reader like MapFile.Reader(fs, path, conf) in this case have been deprecated.

我认为这个问题的发生是因为 SequenceFile API 发生了变化,并且在这种情况下,诸如 MapFile.Reader(fs, path, conf) 之类的 SequenceFile.Reader 的某些 API 已被弃用。

This solution works fine for me.

这个解决方案对我来说很好。

回答by Liu Yan

You should set the property of confaccording to your core-site.xmlfile like this:

您应该conf根据您的core-site.xml文件设置属性,如下所示:

conf.set("fs.defaultFS", "hdfs://host:port");
conf.set("mapreduce.jobtracker.address", "host:port");

回答by Muralikrishna G S

I had a same issue, I resolved that by adding

我有同样的问题,我通过添加解决了这个问题

FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)

in the driver class.

在司机班。

You have to import URIfrom java.net.URI

你必须URIjava.net.URI

回答by Victor

include below line in job runner: DistributedCache.addCacheFile(new URI(""), conf);

在作业运行器中包含以下行: DistributedCache.addCacheFile(new URI(""), conf);

below code in setup method of mapper

下面是映射器设置方法中的代码

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    Configuration configuration = new Configuration();
    FileSystem fileSystem = null;
    try {
         fileSystem = FileSystem.get(new URI("<File location"),configuration);
    } catch (URISyntaxException e) {
        e.printStackTrace();
    }

    String location = <S3 file location>;
    FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location));
    Scanner scanner = new Scanner(fsDataInputStream);
    int i = 1;
    while(scanner.hasNextLine()) {
        String str[] = scanner.nextLine().split(",");
        LOG.info("keys are \t" + str[0] + str[1]);
        stickerMap.put(str[0] + str[1], i);
        ++i;
    }
}