java Hadoop MapReduce sort 使用key减少输出

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/17134341/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-01 01:05:34  来源:igfitidea点击:

Hadoop MapReduce sort reduce output using the key

javasortinghadoopmapreducecomparator

提问by fyaa

down below there is a map-reduce program counting words of several text files. My aim is to have the result in a descending order regarding the amount of appearences.

下面有一个 map-reduce 程序计算几个文本文件的单词。我的目标是让结果按出现次数降序排列。

Unfortunately the program sorts the output lexicographically by the key. I want a natural order of the integer value.

不幸的是,该程序按关键字按字典顺序对输出进行排序。我想要整数值的自然顺序。

So I added a custom comparator with job.setSortComparatorClass(IntComparator.class). But this doesn't work as expected. I'm getting the following exception:

所以我添加了一个自定义比较器job.setSortComparatorClass(IntComparator.class)。但这并不像预期的那样工作。我收到以下异常:

java.lang.Exception: java.nio.BufferUnderflowException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355)
    at WordCount$IntComparator.compare(WordCount.java:128)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:987)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:100)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:64)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1277)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1174)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:609)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:675)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

Any help would be appreciated! :)

任何帮助,将不胜感激!:)

I've listed the whole program below as there may be a reason for the exception which I obviously don't know. As you can see I am using the new mapreduce api (org.apache.hadoop.mapreduce.*).

我在下面列出了整个程序,因为可能存在我显然不知道的异常原因。如您所见,我正在使用新的 mapreduce api ( org.apache.hadoop.mapreduce.*)。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Counts the words in several text files.
 */
public class WordCount {
  /**
   * Maps lines of text to (word, amount) pairs.
   */
  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable amount = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String textLine = value.toString();

      StringTokenizer tokenizer = new StringTokenizer(textLine);
      while (tokenizer.hasMoreElements()) {
        word.set((String) tokenizer.nextElement());

        context.write(word, amount);
      }
    }

  }

  /**
   * Reduces (word, amount) pairs to (amount, word) list.
   */
  public static class Reduce extends
      Reducer<Text, IntWritable, IntWritable, Text> {

    private IntWritable amount = new IntWritable();
    private int sum;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> valueList,
        Context context) throws IOException, InterruptedException {
      sum = 0;

      for (IntWritable value : valueList) {
        sum += value.get();
      }

      amount.set(sum);
      context.write(amount, key);
    }
  }

  public static class IntComparator extends WritableComparator {
    public IntComparator() {
      super(IntWritable.class);
    }

    private Integer int1;
    private Integer int2;

    @Override
    public int compare(byte[] raw1, int offset1, int length1, byte[] raw2,
        int offset2, int length2) {
      int1 = ByteBuffer.wrap(raw1, offset1, length1).getInt();
      int2 = ByteBuffer.wrap(raw2, offset2, length2).getInt();

      return int2.compareTo(int1);
    }

  }

  /**
   * Job configuration.
   * 
   * @param args
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   */
  public static void main(String[] args) throws IOException,
      ClassNotFoundException, InterruptedException {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration configuration = new Configuration();
    configuration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
    Job job = new Job(configuration);
    job.setJobName("WordCount");
    job.setJarByClass(WordCount.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setSortComparatorClass(IntComparator.class);

    FileInputFormat.setInputPaths(job, inputPath);

    FileSystem.get(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.waitForCompletion(true);
  }
}

回答by Quetzalcoatl

The comparator step occurs between the Mapperand Reducer, which wont work for you as you swap the key and value around in the Reduceritself.

比较器步骤发生在Mapperand之间Reducer,当您在其Reducer本身中交换键和值时,这对您不起作用。

The default WritableComparatorwould normally handle your numerical ordering if the key was IntWritable, except it's getting a Textkey thus resulting in lexicographical ordering.

WritableComparator如果键是IntWritable,默认值通常会处理您的数字排序,除非它获取一个Text键,从而导致字典顺序。

As to why exactly the output at the end isn't sorted by your written out IntWritablekey, I'm unsure. Perhaps it has something to do with the way TextOutputFormatworks? You might have to dig deeper into TextOutputFormatsource code for clues on that, but in short, setting the sort comparator probably won't help you here I'm afraid.

至于为什么最后的输出没有按您写出的IntWritable键排序,我不确定。也许它与TextOutputFormat工作方式有关?您可能需要更深入地研究TextOutputFormat源代码以获取线索,但简而言之,在这里设置排序比较器可能对您没有帮助。

回答by twid

As quetzalcoatlsaid Your comparator is not useful, Since it is used between Map and reduce phase and not after Reduce phase. So to accomplish this you need to either sort in cleanupof Reduceror write another program to sort output of reducer.

正如quetzalcoatl所说,您的比较器没有用,因为它在 Map 和 reduce 阶段之间使用,而不是在 Reduce 阶段之后使用。因此,要做到这一点,你需要或者在排序cleanupReducer或写入另一个程序来减速的排序输出。

回答by Rajendra

Basically, you need sort by value. There are 2 ways to achieve this. But in short you need 2 map-reduce, i.e. run one more map reduce on the output of first Map reduce.

基本上,您需要按值排序。有两种方法可以实现这一点。但简而言之,您需要 2 个 map-reduce,即在第一个 Map reduce 的输出上再运行一个 map reduce。

After completing normal map-reduce do one more map reduce where you take output of first map reduce as input to second map reduce. In second map reduce's map phase you can use a custom class as key e.g. class WordCountVo implements WritableComparable<WordCountVo>and you must override public int compareTo(WordCountVo wodCountVo)method. In WordCountVO you can keep both word and count but compare based on count only. E.g. below are the member variables for WordCountVO

完成法线 map-reduce 后,再做一次 map reduce,将第一个 map reduce 的输出作为第二个 map reduce 的输入。在第二个 map reduce 的 map 阶段,您可以使用自定义类作为键,例如 class WordCountVo implements WritableComparable<WordCountVo>,您必须覆盖 public int compareTo(WordCountVo wodCountVo)方法。在 WordCountVO 中,您可以同时保留字数和计数,但仅根据计数进行比较。例如下面是 WordCountVO 的成员变量

private String word;
private Long count;

Now when you receive key-value pairs in second reducer then your data will be all sorted by values. All you need to do is to write the key value pairs using context! Hope this helps.

现在,当您在第二个减速器中收到键值对时,您的数据将全部按值排序。您需要做的就是使用上下文编写键值对!希望这可以帮助。