java Hadoop 多输入

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27349743/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 11:40:32  来源:igfitidea点击:

Hadoop multiple inputs

javahadoopmapreduce

提问by dex90

I am using hadoop map reduce and I want to compute two files. My first Map/Reduce iteration is giving me an a file with a pair ID number like this:

我正在使用 hadoop map reduce 并且我想计算两个文件。我的第一次 Map/Reduce 迭代给了我一个像这样一对 ID 号的文件:

A 30
D 20

My goal is to use that ID from the file to associate with another file and have another output with a trio: ID, Number, Name, like this:

我的目标是使用文件中的该 ID 与另一个文件关联,并使用三个输出:ID、编号、名称,如下所示:

A ABC 30
D EFGH 20

But I am not sure whether using Map Reduce is the best way to do this. Would it be better for example to use a File Reader to Read the second input file and get the Name by ID? Or can I do it with Map Reduce?

但我不确定使用 Map Reduce 是否是最好的方法。例如,使用文件阅读器读取第二个输入文件并通过 ID 获取名称会更好吗?或者我可以用 Map Reduce 来做吗?

If so, I'm trying to find out how. I tried a MultipleInput solution:

如果是这样,我正在尝试找出方法。我尝试了 MultipleInput 解决方案:

MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
    TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
    TextInputFormat.class, FlightsModeMapper.class); 

But I can't think of any solution to combine the two and get the output I want. The way I have right now is just giving me the list like this example:

但是我想不出任何将两者结合起来并获得我想要的输出的解决方案。我现在的方式只是给我这样的列表:

A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20

After my Last Reduce I am getting this:

在我最后一次减少之后,我得到了这个:

N125DL  767-332
N125DL  7   , 
N126AT  737-76N
N126AT  19  , 
N126DL  767-332
N126DL  1   , 
N127DL  767-332
N127DL  7   , 
N128DL  767-332
N128DL  3

I want this: N127DL 7 767-332. And also, I don't want the ones which do not combine.

我想要这个:N127DL 7 767-332。而且,我不想要那些不结合的。

And this is my reduce class:

这是我的减少类:

public class FlightsByCarrierReducer2 extends Reducer {

公共类 FlightsByCarrierReducer2 扩展 Reducer {

String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context) 
                            throws IOException, InterruptedException {

    int i = 0;  
    for(Text value:values)
    {
        if(i == 0){
            merge = value.toString()+",";
        }
        else{
            merge += value.toString();
        }
        i++;
    }

        context.write(token, new Text(merge));

}

}

}

Update:

更新:

http://stat-computing.org/dataexpo/2009/the-data.htmlthis is the example I'm using.

http://stat-computing.org/dataexpo/2009/the-data.html这是我正在使用的示例。

I'm trying with: TailNum and Cancelled which is (1 or 0) get the model name that corresponds to the TailNum. My file with model has a TailNumb, Model and other stuff. My current output is:

我正在尝试: TailNum 和 Canceled 是(1 或 0)获取与 TailNum 对应的模型名称。我的模型文件有一个 TailNumb、Model 和其他东西。我目前的输出是:

N193JB ERJ 190-100 IGW

N193JB ERJ 190-100 IGW

N194DN 767-332

N194DN 767-332

N19503 EMB-135ER

N19503 EMB-135ER

N19554 EMB-145LR

N19554 EMB-145LR

N195DN 767-332

N195DN 767-332

N195DN 2

N195DN 2

First comes the key, second the model, the keys that has flights cancelled, apperas below the model

首先是钥匙,其次是模特,有航班取消的钥匙,出现在模特下面

And I would like a trio Key,Model Number of Cancelled, Because I want number of Cancellations per model

我想要一个三重键,取消的型号,因为我想要每个型号的取消次数

采纳答案by Unmesha SreeVeni

You can join them using ID as key for both mapper. You can write your map task as something like this

您可以使用 ID 作为两个映射器的键来加入它们。你可以像这样写你的地图任务

public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
    //Get the line
    //split the line to get ID seperate
    //word1 = A 
    //word2 = 30
                //Likewise for A ABC
                   //word1 = A 
                  //word2 = ABC
    context.write(word1, word2);
}

I think you can resuse the same Map task. And then write a commomn Reducer job where Hadoop Framework groups data on key basis. So you will be able to get ID as key. And You can cache one of the value and then concat.

我认为您可以重复使用相同的 Map 任务。然后编写一个通用的 Reducer 作业,其中 Hadoop 框架以关键为基础对数据进行分组。因此,您将能够获得 ID 作为密钥。并且您可以缓存其中一个值,然后进行连接。

String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
    int i =0;
    for(Text value:values)
    {
        if(i == 0){
            merge = value.toString()+",";
        }
        else{
            merge += value.toString();
        }
        i++;
    }
    valEmit.set(merge);
    context.write(key, valEmit);
}

Finally you can write your Driver class

最后,您可以编写 Driver 类

public int run(String[] args) throws Exception {
 Configuration c=new Configuration();
 String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
 Path p1=new Path(files[0]);
 Path p2=new Path(files[1]);
 Path p3=new Path(files[2]);
 FileSystem fs = FileSystem.get(c);
 if(fs.exists(p3)){
  fs.delete(p3, true);
  }
 Job job = new Job(c,"Multiple Job");
 job.setJarByClass(MultipleFiles.class);
 MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
 MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
 job.setReducerClass(MultipleReducer.class);
 .
 .
}

You can find the example HERE

您可以在此处找到示例

Hope this helps.

希望这可以帮助。



UPDATE

更新

Input1

输入1

A 30
D 20

Input2

输入2

A ABC
D EFGH

Output

输出

A ABC 30
D EFGH 20

Mapper.java

映射器

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author sreeveni
 *
 */
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
    Text keyEmit = new Text();
    Text valEmit = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String parts[] = line.split(" ");
        keyEmit.set(parts[0]);
        valEmit.set(parts[1]);
        context.write(keyEmit, valEmit);
    }
}

Reducer.java

减速器.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author sreeveni
 *
 */
public class ReducerJoin extends Reducer<Text, Text, Text, Text> {

    Text valEmit = new Text();
    String merge = "";

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String character = "";
        String number = "";
        for (Text value : values) {
            // ordering output
            String val = value.toString();
            char myChar = val.charAt(0);

            if (Character.isDigit(myChar)) {
                number = val;
            } else {
                character = val;
            }
        }
        merge = character + " " + number;
        valEmit.set(merge);
        context.write(key, valEmit);
    }

}

Driver class

驱动程序类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author sreeveni
 *
 */
public class Driver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        // checking the arguments count

        if (args.length != 3) {
            System.err
                    .println("Usage : <inputlocation>  <inputlocation>  <outputlocation> ");
            System.exit(0);
        }
        int res = ToolRunner.run(new Configuration(), new Driver(), args);
        System.exit(res);

    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        String source1 = args[0];
        String source2 = args[1];
        String dest = args[2];
        Configuration conf = new Configuration();
        conf.set("mapred.textoutputformat.separator", " "); // changing default
                                                            // delimiter to user
                                                            // input delimiter
        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Multiple Jobs");

        job.setJarByClass(Driver.class);
        Path p1 = new Path(source1);
        Path p2 = new Path(source2);
        Path out = new Path(dest);
        MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
                Mapper1.class);
        MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
                Mapper1.class);
        job.setReducerClass(ReducerJoin.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        /*
         * delete if exist
         */
        if (fs.exists(out))
            fs.delete(out, true);

        TextOutputFormat.setOutputPath(job, out);
        boolean success = job.waitForCompletion(true);

        return success ? 0 : 1;
    }

}

回答by Chris Gerken

Your reducer has a map method, but it should have a reduce method that takes an Iterable collection of values which you then merge. Because you don't have a reduce() method, you get the default behavior which is to just pass through all of the key/value pairs.

您的reducer 有一个map 方法,但它应该有一个reduce 方法,该方法接受一个I​​terable 值集合,然后您将其合并。因为您没有 reduce() 方法,所以您会得到默认行为,即只传递所有键/值对。