如何在Hadoop中读写SequenceFile
这篇文章展示了如何使用Java API,使用Hadoop MapReduce在Hadoop中读取和写入SequenceFile,以及如何为SequenceFile提供压缩选项。
编写序列文件Java程序
SeqeunceFile提供了一个静态方法createWriter()来创建一个写程序,该写程序用于在Hadoop中编写SequenceFile。createWriter方法有许多重载的变体(其中许多现在已弃用),但此处使用的方法如下。
public static org.apache.hadoop.io.SequenceFile.Writer createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts) throws IOException
Java代码
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
public class SFWrite {
public static void main(String[] args) {
Configuration conf = new Configuration();
int i =0;
try {
FileSystem fs = FileSystem.get(conf);
// input file in local file system
File file = new File("/home/theitroad/Documents/theitroad/Hadoop/Test/data.txt");
// Path for output file
Path outFile = new Path(args[0]);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(conf, Writer.file(outFile),
Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()),
Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()));
for (String line : FileUtils.readLines(file)) {
key.set(i++);
value.set(line);
writer.append(key, value);
}
}finally {
if(writer != null) {
writer.close();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
在程序中还提供了压缩选项,并且使用的压缩编解码器是GzipCodec。
在Hadoop环境中执行程序
要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。
导出HADOOP_CLASSPATH ='/ huser / eclipse-workspace / theitroad / bin'
我的SFWrite.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已导出该路径。
然后,我们可以使用以下命令运行该程序-
$ hadoop org.theitroad.SFWrite /user/output/item.seq 18/03/22 12:10:21 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 18/03/22 12:10:21 INFO compress.CodecPool: Got brand-new compressor [.gz]
/user/output/item.seq是HDFS中的输出路径。
如果我们尝试在HDFS中显示文件内容,则该内容将不可读,因为SequenceFile是二进制文件格式。这使我们进入第二部分,如何读取序列文件。
读取序列文件Java程序
要在Hadoop中读取SequenceFile,我们需要获得一个SequenceFile.Reader实例,该实例可以读取任何写入程序SequenceFile格式。
使用此阅读器实例,我们可以使用next()方法遍历记录,此处使用的next方法的变体将键和值都作为Writable类型的参数,并分配从序列中读取的下一个(键,值)对文件放入这些变量。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
public class SFRead {
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Path inFile = new Path(args[0]);
SequenceFile.Reader reader = null;
try {
IntWritable key = new IntWritable();
Text value = new Text();
reader = new SequenceFile.Reader(conf, Reader.file(inFile), Reader.bufferSize(4096));
//System.out.println("Reading file ");
while(reader.next(key, value)) {
System.out.println("Key " + key + "Value " + value);
}
}finally {
if(reader != null) {
reader.close();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
使用MapReduce作业编写SequenceFile
我们还可以使用MapReduce作业在Hadoop中编写序列文件。当我们有一个大文件并且想要利用并行处理时,这很有用。
在这种情况下,MapReduce作业将非常简单,我们甚至不需要reduce作业,而Map任务只需编写(键,值)对。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SequenceFileWriter extends Configured implements Tool{
// Map function
public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new SequenceFileWriter(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sfwrite");
job.setJarByClass(SequenceFileWriter.class);
job.setMapperClass(SFMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Compression related settings
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
int returnFlag = job.waitForCompletion(true) ? 0 : 1;
return returnFlag;
}
}
在用于编写SequenceFile的MapReduce作业中,更重要的是为输出和压缩指定的作业设置。
使用MapReduce作业读取SequenceFile
如果要使用MapReduce作业读取序列文件,则该代码将与编写序列文件的方式非常相似。
一个主要变化是输入和输出格式。
job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SequenceFileReader extends Configured implements Tool{
// Map function
public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new SequenceFileReader(), args);
System.exit(exitFlag);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sfread");
job.setJarByClass(SequenceFileReader.class);
job.setMapperClass(SFMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int returnFlag = job.waitForCompletion(true) ? 0 : 1;
return returnFlag;
}
}

