如何在Hadoop中链接MapReduce作业
在许多情况下,我们希望创建一系列MapReduce作业以完全转换和处理数据。这比将所有内容放在一个MapReduce作业中并使之变得非常复杂要好。
实际上,我们可以通过各种来源获取数据,也可以使用各种应用程序序列。这可以通过使用Oozie创建工作流程来完成,但这是另一篇文章的主题。在本文中,我们将介绍如何使用ChainMapper和ChainReducer在Hadoop中链接MapReduce作业。
Hadoop中的ChainMapper
ChainMapper是Hadoop中预定义的MapReduce类之一。 ChainMapper类允许我们在一个Map任务中使用多个Mapper类。以链接的方式调用Mapper类,第一个Mapper的输出成为第二个Mapper的输入,依此类推,直到最后一个Mapper,最后一个Mapper的输出将被写入任务的输出。
我们可以使用addMapper()方法将映射器添加到ChainMapper。
Hadoop中的ChainReducer
ChainReducer类允许在Reducer任务中的Reducer之后链接多个Mapper类。对于Reducer输出的每个记录,将以链接方式调用Mapper类。减速器的输出成为第一个映射器的输入,第一个的输出成为第二个映射器的输入,依此类推,直到最后一个映射器,最后一个映射器的输出将被写入任务的输出。
要将Mapper类添加到链式减速器中,可以使用addMapper()方法。
要将Reducer类设置为链式作业,可以使用setReducer()方法。
链接MapReduce作业
使用ChainMapper和ChainReducer类,可以组成看起来像 [MAP+ / REDUCE MAP*] 的MapReduce作业。
当我们使用链接的MapReduce时,可以组合使用以下内容:
一个或者多个映射器
单个Reducer
零个或者多个映射器(可选,仅在使用链接的reducer时使用)
使用链接的MapReduce作业时,来自映射器或者reducer的数据存储(并使用)在内存中,而不是存储在磁盘上,这会在很大程度上减少磁盘IO。
MapReduce链接示例
每天都有以下格式的股票数据,包括股票代码,价格和交易。
AAA 23 5677 BBB 23 12800 aaa 26 23785 ..... .....
在数据符号中并不总是大写。因此有两个映射器,首先提取相关字段(符号和事务)。在第二个映射器中,符号被转换为大写。
然后是一个减少器,它增加了每个交易品种的交易量。然后在reduce任务中,有一个InverseMapper可以反转键,值对。请注意,InverseMapper是Hadoop框架中的预定义Mapper类,这就是为什么示例代码中没有实现的原因。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class StockTrans extends Configured implements Tool{
// Mapper 1
public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text symbol = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Splitting the line on tab
String[] stringArr = value.toString().split("\t");
//Setting symbol and transaction values
symbol.set(stringArr[0]);
Integer trans = Integer.parseInt(stringArr[2]);
context.write(symbol, new IntWritable(trans));
}
}
// Mapper 2
public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
public void map(Text key, IntWritable value, Context context)
throws IOException, InterruptedException {
String symbol = key.toString().toUpperCase();
context.write(new Text(symbol), value);
}
}
// Reduce function
public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
int exitFlag = ToolRunner.run(new StockTrans(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Stock transactio");
job.setJarByClass(getClass());
// MapReduce chaining
Configuration map1Conf = new Configuration(false);
ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
Text.class, IntWritable.class, map1Conf);
Configuration map2Conf = new Configuration(false);
ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
Text.class, IntWritable.class, map2Conf);
Configuration reduceConf = new Configuration(false);
ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
Text.class, IntWritable.class, reduceConf);
ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
IntWritable.class, Text.class, null);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
创建jar包后运行此代码。
hadoop jar /home/theitroad/Documents/theitroad/theitroadhadoop.jar org.theitroad.StockTrans /user/input/StockTrans.txt /user/output/stock
输出量
hdfs dfs -cat /user/output/stock/part-r-00000 50483 AAA 180809 BBB

