java 为简单的 hadoop mapreduce 作业运行两个 mapper 和两个 reducer
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/11717495/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Running two mapper and two reducer for simple hadoop mapreduce jobs
提问by
I just wanted to have a better understanding on using multiple mappers and reducers.I want to try this out using a simple hadoop mapreduce Word count job.I want to run two mapper and two reducer for this wordcount job.Is there that I need to configure manually on the configuration files or is it just enough to just make changes on the WordCount.java file.
我只是想更好地理解使用多个映射器和化简器。我想使用一个简单的 hadoop mapreduce Word count 作业来尝试一下。我想为这个 wordcount 作业运行两个映射器和两个化简器。有没有我需要在配置文件上手动配置,或者仅对 WordCount.java 文件进行更改就足够了。
I'm running this job on a Single node.And I'm running this job as
我在单个节点上运行此作业。我正在运行此作业
$ hadoop jar job.jar input output
$ hadoop jar job.jar 输入输出
And i've started
我已经开始
$ hadoop namenode -format
$ hadoop namenode
$ hadoop datanode
sbin$ ./yarn-daemon.sh start resourcemanager sbin$ ./yarn-daemon.sh start resourcemanager
sbin$ ./yarn-daemon.sh 启动资源管理器 sbin$ ./yarn-daemon.sh 启动资源管理器
I'm running hadoop-2.0.0-cdh4.0.0
我正在运行 hadoop-2.0.0-cdh4.0.0
And my WordCount.java file is
我的 WordCount.java 文件是
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.rg.apache.hadoop.fs.Path;
import oapache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static final Log LOG = LogFactory.getLog(WordCount.class);
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
//printKeyAndValues(key, values);
for (IntWritable val : values) {
sum += val.get();
LOG.info("val = " + val.get());
}
LOG.info("sum = " + sum + " key = " + key);
result.set(sum);
context.write(key, result);
//System.err.println(String.format("[reduce] word: (%s), count: (%d)", key, result.get()));
}
// a little method to print debug output
private void printKeyAndValues(Text key, Iterable<IntWritable> values)
{
StringBuilder sb = new StringBuilder();
for (IntWritable val : values)
{
sb.append(val.get() + ", ");
}
System.err.println(String.format("[reduce] key: (%s), value: (%s)", key, sb.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Could anyone of you help me now to run two mapper and the reducers for this Word count job?
你们中的任何人现在都可以帮助我为这个字数统计工作运行两个映射器和减速器吗?
采纳答案by Arun A K
Gladnick: In case you are planning to use the default TextInputFormat, there would be atleast as many mappers at the number of input files (or more depending on the file size). So just put 2 files into your input directoriesso that you can get 2 mappers running. (Advising this solution, because you plan to run this as a test case).
Gladnick:如果您打算使用默认的TextInputFormat,那么至少有与输入文件数量一样多的映射器(或更多,取决于文件大小)。因此,只需将 2 个文件放入输入目录中,即可运行 2 个映射器。(建议此解决方案,因为您计划将此作为测试用例运行)。
Now that you have asked for 2 reducers, all you need to do is job.setNumReduceTasks(2)in your main befor submiting the job.
既然您已经要求了 2 个减速器,那么您需要做的就是在提交作业之前在主文件中使用job.setNumReduceTasks(2)。
After that just prepare a jar of your application and run that in hadoop pseudo cluster.
之后,只需准备一个 jar 应用程序并在hadoop 伪集群中运行它。
In case you need to specify which word to go to which reducer, you can specify that in the Partitioner class.
如果您需要指定哪个词去哪个 reducer,您可以在 Partitioner 类中指定。
Configuration configuration = new Configuration();
// create a configuration object that provides access to various
// configuration parameters
Job job = new Job(configuration, "Wordcount-Vowels & Consonants");
// create the job object and set job name as Wordcount-Vowels &
// Consonants
job.setJarByClass(WordCount.class);
// set the main class
job.setNumReduceTasks(2);
// set the number of reduce tasks required
job.setMapperClass(WordCountMapper.class);
// set the map class for the job
job.setCombinerClass(WordCountCombiner.class);
// set the combiner class for the job
job.setPartitionerClass(VowelConsonantPartitioner.class);
// set the partitioner class for the job
job.setReducerClass(WordCountReducer.class);
// set the reduce class for the job
job.setOutputKeyClass(Text.class);
// set the output type of key (the word) expected from the job, Text
// analogous to String
job.setOutputValueClass(IntWritable.class);
// set the output type of value (the count) expected from the job,
// IntWritable analogous to int
FileInputFormat.addInputPath(job, new Path(args[0]));
// set the input directory for fetching the input files
FileOutputFormat.setOutputPath(job, new Path(args[1]));
This should be the structure of your main program. You may include the combiner and the partitioner in case needed.
这应该是您的主程序的结构。如果需要,您可以包括组合器和分区器。
回答by Fakrudeen
For mappers set
对于映射器集
mapred.max.split.size
to half the size of your file.
到文件大小的一半。
For reducers set them to 2 explicitly as
对于减速器,将它们显式设置为 2
mapred.reduce.tasks=2