Java Mapreduce 组合器

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20212884/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 00:21:30  来源:igfitidea点击:

Mapreduce Combiner

javahadoopmapreduce

提问by user2401464

I have a simple mapreduce code with mapper, reducer and combiner. The output from mapper is passed to combiner. But to the reducer, instead of output from combiner,output from mapper is passed.

我有一个带有映射器、减速器和组合器的简单 mapreduce 代码。映射器的输出传递给组合器。但是对于reducer,不是从combiner 输出,而是从mapper 输出。

Kindly help

请帮忙

Code:

代码:

package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AverageSalary
{
public static class Map extends  Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {    
        String[] empDetails= value.toString().split(",");
        Text unit_key = new Text(empDetails[1]);      
        DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
        context.write(unit_key,salary_value);    

    }  
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{
    public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
    {
        String val;
        double sum=0;
        int len=0;
        while (values.iterator().hasNext())
        {
            sum+=values.iterator().next().get();
            len++;
        }
        val=String.valueOf(sum)+":"+String.valueOf(len);
        try {
            context.write(key,new Text(val));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{
    public void reduce (final Text key, final Text values, final Context context)
    {
        //String[] sumDetails=values.toString().split(":");
        //double average;
        //average=Double.parseDouble(sumDetails[0]);
        try {
            context.write(key,values);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static void main(String args[])
{
    Configuration conf = new Configuration();
    try
    {
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
     if (otherArgs.length != 2) {      
         System.err.println("Usage: Main <in> <out>");      
         System.exit(-1);    }    
     Job job = new Job(conf, "Average salary");    
     //job.setInputFormatClass(KeyValueTextInputFormat.class);    
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    
     job.setJarByClass(AverageSalary.class);    
     job.setMapperClass(Map.class);    
     job.setCombinerClass(Combiner.class);
     job.setReducerClass(Reduce.class);    
     job.setOutputKeyClass(Text.class);    
     job.setOutputValueClass(Text.class);    

        System.exit(job.waitForCompletion(true) ? 0 : -1);
    } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

}

采纳答案by user987339

It seems that you forgot about important property of a combiner:

您似乎忘记了组合器的重要属性:

the input types for the key/value and the output types of the key/value need to be the same.

键/值的输入类型和键/值的输出类型需要相同。

You can't take in a Text/DoubleWritableand return a Text/Text. I suggest you to use TextInstead DoubleWritable, and do proper parsing inside Combiner.

你不能接受 aText/DoubleWritable并返回 a Text/Text。我建议你使用Text替代DoubleWritable,并在里面做适当的解析Combiner

回答by Donald Miner

The #1 rule of Combiners are: do not assume that the combiner will run. Treat the combiner only as an optimization.

组合器的 #1 规则是:不要假设组合器会运行将组合器视为优化

The Combiner is not guaranteed to run over all of your data. In some cases when the data doesn't need to be spilled to disk, MapReduce will skip using the Combiner entirely. Note also that the Combiner may be ran multiple times over subsets of the data! It'll run once per spill.

不能保证组合器运行所有数据。在某些情况下,当数据不需要溢出到磁盘时,MapReduce 将完全跳过使用组合器。另请注意,组合器可能会在数据子集上运行多次!每次溢出都会运行一次。

In your case, you are making this bad assumption. You should be doing the sum in the Combiner AND the Reducer.

在您的情况下,您正在做出这个错误的假设。您应该在组合器和减速器中进行求和。

Also, you should follow @user987339's answer as well. The input and output of the combiner needs to be identical (Text,Double -> Text,Double) and it needs to match up with the output of the Mapper and the input of the Reducer.

此外,您也应该遵循@ user987339 的回答。组合器的输入和输出需要相同(Text,Double -> Text,Double),并且需要与 Mapper 的输出和 Reducer 的输入匹配。

回答by jintocvg

Combinerwill not work always when you run mapreduce.

Combiner运行时并不总是有效mapreduce

If there is at least three spill files (output of mapper written to local-disk) the combiner will execute so that the size of file can be reduced so that it can be easily transferred to reduce node.

如果至少有三个溢出文件(映射器的输出写入本地磁盘),组合器将执行,以便减小文件的大小,以便可以轻松地将其传输到reduce节点。

The number of spills for which a combiner need to run can be set through min.num.spills.for.combineproperty

可以通过min.num.spills.for.combine属性设置组合器需要运行的溢出次数

回答by Narsireddy

If a combine function is used, then it is the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) Often the combine and reduce functions are the same, in which case, K3 is the same as K2, and V3 is the same as V2.

如果使用了combin 函数,那么它和reduce 函数形式相同(并且是Reducer 的一个实现),只是它的输出类型是中间键和值类型(K2 和V2),因此它们可以馈送给reduce 函数: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3) 经常combine 和 reduce 函数相同,在这种情况下,K3 与 K2 相同,V3 与 V2 相同。