Java Hadoop DistributedCache 已弃用 - 首选 API 是什么?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21239722/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 07:34:00  来源:igfitidea点击:

Hadoop DistributedCache is deprecated - what is the preferred API?

javahadoopmapreduce

提问by DNA

My map tasks need some configuration data, which I would like to distribute via the Distributed Cache.

我的地图任务需要一些配置数据,我想通过分布式缓存分发这些数据。

The Hadoop MapReduce Tutorialshows the usageof the DistributedCache class, roughly as follows:

Hadoop MapReduce 教程展示了DistributedCache 类的用法,大致如下:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

However, DistributedCacheis marked as deprecatedin Hadoop 2.2.0.

但是,在 Hadoop 2.2.0 中DistributedCache标记为已弃用

What is the new preferred way to achieve this? Is there an up-to-date example or tutorial covering this API?

实现这一目标的新首选方法是什么?是否有涵盖此 API 的最新示例或教程?

采纳答案by user2371156

The APIs for the Distributed Cache can be found in the Job class itself. Check the documentation here: http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.htmlThe code should be something like

分布式缓存的 API 可以在 Job 类本身中找到。检查此处的文档:http: //hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html代码应该类似于

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

In your mapper code:

在您的映射器代码中:

Path[] localPaths = context.getLocalCacheFiles();
...

回答by jtravaglini

The new DistributedCache API for YARN/MR2 is found in the org.apache.hadoop.mapreduce.Jobclass.

用于 YARN/MR2 的新 DistributedCache API 位于org.apache.hadoop.mapreduce.Job该类中。

   Job.addCacheFile()

Unfortunately, there aren't as of yet many comprehensive tutorial-style examples of this.

不幸的是,目前还没有很多全面的教程式示例。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

回答by tolgap

To expand on @jtravaglini, the preferred way of using DistributedCachefor YARN/MapReduce 2 is as follows:

为了扩展@jtravaglini,DistributedCacheYARN/MapReduce 2的首选使用方式如下:

In your driver, use the Job.addCacheFile()

在您的驱动程序中,使用 Job.addCacheFile()

public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "MyJob");

    job.setMapperClass(MyMapper.class);

    // ...

    // Mind the # sign after the absolute file location.
    // You will be using the name after the # sign as your
    // file name in your Mapper/Reducer
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

    return job.waitForCompletion(true) ? 0 : 1;
}

And in your Mapper/Reducer, override the setup(Context context)method:

在您的 Mapper/Reducer 中,覆盖该setup(Context context)方法:

@Override
protected void setup(
        Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
    if (context.getCacheFiles() != null
            && context.getCacheFiles().length > 0) {

        File some_file = new File("./some");
        File other_file = new File("./other");

        // Do things to these two files, like read them
        // or parse as JSON or whatever.
    }
    super.setup(context);
}

回答by patapouf_ai

I had the same problem. And not only is DistributedCach deprecated but getLocalCacheFiles and "new Job" too. So what worked for me is the following:

我有同样的问题。不仅不推荐使用 DistributedCach,而且还弃用 getLocalCacheFiles 和“新作业”。所以对我有用的是以下内容:

Driver:

司机:

Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());

In Mapper/Reducer setup:

在 Mapper/Reducer 设置中:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null

    Path file1path = new Path(files[0])
    ...
}

回答by Hymanie Jiang

I did not use job.addCacheFile(). Instead I used -files option like "-files /path/to/myfile.txt#myfile" as before. Then in the mapper or reducer code I use the method below:

我没有使用 job.addCacheFile()。相反,我像以前一样使用了 -files 选项,如“-files /path/to/myfile.txt#myfile”。然后在映射器或减速器代码中,我使用以下方法:

/**
 * This method can be used with local execution or HDFS execution. 
 * 
 * @param context
 * @param symLink
 * @param throwExceptionIfNotFound
 * @return
 * @throws IOException
 */
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
    URI[] uris = context.getCacheFiles();
    if(uris==null||uris.length==0)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    URI symlinkUri = null;
    for(URI uri: uris)
    {
        if(symLink.equals(uri.getFragment()))
        {
            symlinkUri = uri;
            break;
        }
    }   
    if(symlinkUri==null)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);

}

Then in mapper/reducer:

然后在映射器/减速器中:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
    ... do work ...
}

Note that if I used "-files /path/to/myfile.txt" directly then I need to use "myfile.txt" to access the file since that is the default symlink name.

请注意,如果我直接使用“-files /path/to/myfile.txt”,那么我需要使用“myfile.txt”来访问该文件,因为这是默认的符号链接名称。

回答by Somum

None of the solution mentioned worked for me in completeness . It could because Hadoop version keeps changing I am using hadoop 2.6.4. Essentially, DistributedCache is deprecated so I didnt want to use that. As some of the post suggest us to use addCacheFile() however, it has changed a bit. Here is how it worked for me

提到的解决方案都没有对我有用。可能是因为 Hadoop 版本不断变化,我使用的是 hadoop 2.6.4。从本质上讲,不推荐使用 DistributedCache,所以我不想使用它。然而,正如一些帖子建议我们使用 addCacheFile() 一样,它已经发生了一些变化。这是它对我的工作方式

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

Here X.X.X.X can be Master IP address or localhost. The EnglishStop.txt was stored in HDFS at / location.

这里的 XXXX 可以是主 IP 地址或本地主机。EnglishStop.txt 存储在 HDFS 中的 / 位置。

hadoop fs -ls /

The output is

输出是

-rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test

Funny but convenient, #EnglishStop.txt means now we can access it as "EnglishStop.txt" in mapper. Here is the code for the same

有趣但方便,#EnglishStop.txt 意味着现在我们可以在映射器中以“EnglishStop.txt”的形式访问它。这是相同的代码

public void setup(Context context) throws IOException, InterruptedException     
{
    File stopwordFile = new File("EnglishStop.txt");
    FileInputStream fis = new FileInputStream(stopwordFile);
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

    while ((stopWord = reader.readLine()) != null) {
        // stopWord is a word read from Cache
    }
}

This just worked for me. You can read line from the file stored in HDFS

这对我有用。您可以从存储在 HDFS 中的文件中读取行