Java程序在Hadoop中以bzip2格式压缩文件
这篇文章显示了如何编写Java程序来使用bzip2压缩来压缩HDFS中的文件。该程序从本地文件系统获取输入文件,并将BZip2压缩文件作为输出输出到HDFS中。
Java程序以bzip2格式压缩文件
必须用于bzip2的Hadoop压缩编解码器是" org.apache.hadoop.io.compress.Bzip2Codec"。
要获得该编解码器,请使用CompressionCodecFactory类的getCodecByClassName方法。
要创建CompressionOutputStream,请使用编解码器类的createOutputStream(OutputStream out)方法。我们将使用CompressionOutputStream将压缩形式的文件数据写入流中。
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
public class HDFSCompressWrite {
public static void main(String[] args) {
Configuration conf = new Configuration();
InputStream in = null;
OutputStream out = null;
try {
FileSystem fs = FileSystem.get(conf);
// Input file from local file system
in = new BufferedInputStream(new FileInputStream("/home/theitroad/Documents/theitroad/Hadoop/Test/data.txt"));
//Compressed Output file
Path outFile = new Path("/user/compout/test.bz2");
// Verification
if (fs.exists(outFile)) {
System.out.println("Output file already exists");
throw new IOException("Output file already exists");
}
out = fs.create(outFile);
// For bzip2 compression
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.BZip2Codec");
CompressionOutputStream compressionOutputStream = codec.createOutputStream(out);
try {
IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
compressionOutputStream.finish();
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(compressionOutputStream);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
在Hadoop环境中执行程序
要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。
导出HADOOP_CLASSPATH ='/ huser / eclipse-workspace / theitroad / bin'
我的HDFSCompressWrite.class文件位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已经导出了该路径。
然后,我们可以使用以下命令运行该程序-
$ hadoop org.theitroad.HDFSCompressWrite 18/03/09 17:10:04 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native 18/03/09 17:10:04 INFO compress.CodecPool: Got brand-new compressor [.bz2]
程序中使用的输入文件足够大,可以确保即使压缩文件大小超过128 MB,也可以确保将其作为两个单独的块存储在HDFS中。由于Hadoop中的bzip2格式的压缩文件支持拆分,因此以该压缩文件作为输入的MapReduce作业应该能够创建对应于两个块的2个单独的输入拆分。
首先检查是否创建了bzip2格式的压缩输出文件。
$ hdfs dfs -ls /user/compout Found 1 items -rw-r--r-- 1 theitroad supergroup 228651107 2018-03-09 17:11 /user/compout/test.bz2
我们可以看到压缩文件的大小约为228 MB,因此应将其作为两个单独的块存储在HDFS中。
我们可以使用HDFS fsck命令进行检查。
$ hdfs fsck /user/compout/test.bz2 Status: HEALTHY Total size: 228651107 B Total dirs: 0 Total files: 1 Total symlinks: 0 Total blocks (validated): 2 (avg. block size 114325553 B) Minimally replicated blocks: 2 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) FSCK ended at Fri Mar 09 17:17:13 IST 2018 in 3 milliseconds
如果将此压缩文件作为输入提供给MapReduce作业,则由于bzip2格式支持拆分,因此MapReduce作业应该能够创建两个输入拆分。要检查该文件是否已输入到wordcount MapReduce程序中。
$ hadoop jar /home/theitroad/Documents/theitroad/Hadoop/wordcount.jar org.theitroad.WordCount /user/compout/test.bz2 /user/output2 18/03/11 12:48:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 18/03/11 12:48:29 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/03/11 12:48:30 INFO input.FileInputFormat: Total input files to process : 1 18/03/11 12:48:30 INFO mapreduce.JobSubmitter: number of splits:2
如我们在控制台" mapreduce.JobSubmitter:分割数:2"上显示的语句中所见,为地图任务创建了两个分割。

