如何在Hadoop中读写Parquet文件
在本文中,我们将介绍如何使用Java API在Hadoop中读写Parquet文件。我们还将看到如何使用MapReduce在Hadoop中编写Parquet文件。
不是直接使用ParquetWriter和ParquetReader,而是使用AvroParquetWriter和AvroParquetReader来写入和读取Parquet文件。
AvroParquetWriter和AvroParquetReader类将负责从Avro架构到Parquet架构以及类型的转换。
所需的jar包
要编写Java程序来读写Parquet文件,我们需要将以下jar放在classpath中。我们可以将它们添加为Maven依赖项或者复制jar。
avro-1.8.2.jar
parquet-hadoop-bundle-1.10.0.jar
parquet-avro-1.10.0.jar
Hymanson-mapper-asl-1.9.13.jar
Hymanson-core-asl-1.9.13.jar
slf4j-api-1.7.25.jar
用Java程序编写parquet文件
由于使用了Avro,因此我们需要avro模式。
schema.avsc
{
"type": "record",
"name": "testFile",
"doc": "test records",
"fields":
[{
"name": "id",
"type": "int"
},
{
"name": "empName",
"type": "string"
}
]
}
Java代码
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ExampleParquetWriter {
public static void main(String[] args) {
Schema schema = parseSchema();
List<GenericData.Record> recordList = createRecords(schema);
writeToParquetFile(recordList, schema);
}
// Method to parse the schema
private static Schema parseSchema() {
Schema.Parser parser = new Schema.Parser();
Schema schema = null;
try {
// Path to schema file
schema = parser.parse(ClassLoader.getSystemResourceAsStream("resources/schema.avsc"));
} catch (IOException e) {
e.printStackTrace();
}
return schema;
}
private static List<GenericData.Record> createRecords(Schema schema){
List<GenericData.Record> recordList = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", i);
record.put("empName", i+"a");
recordList.add(record);
}
return recordList;
}
private static void writeToParquetFile(List<GenericData.Record> recordList, Schema schema) {
// Output path for Parquet file in HDFS
Path path = new Path("/user/out/data.parquet");
ParquetWriter<GenericData.Record> writer = null;
// Creating ParquetWriter using builder
try {
writer = AvroParquetWriter.
<GenericData.Record>builder(path)
.withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
.withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
.withSchema(schema)
.withConf(new Configuration())
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withValidation(false)
.withDictionaryEncoding(false)
.build();
// writing records
for (GenericData.Record record : recordList) {
writer.write(record);
}
}catch(IOException e) {
e.printStackTrace();
}finally {
if(writer != null) {
try {
writer.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
在Hadoop环境中执行程序
在Hadoop环境中运行该程序之前,我们需要将上述jars放入HADOOP_INSTALLATION_DIR / share / hadoop / mapreduce / lib中。
如果版本不匹配,请将当前版本的Avro-1.x.x jar放在HADOOP_INSTALLATION_DIR / share / hadoop / common / lib位置。
要在Hadoop环境中执行上述Java程序,我们需要在Hadoop的类路径中添加包含Java程序的.class文件的目录。
$ export HADOOP_CLASSPATH='/huser/eclipse-workspace/theitroad/bin'
我的示例文件ParquetWriter.class位于/ huser / eclipse-workspace / theitroad / bin位置,因此我已导出该路径。
然后,我们可以使用以下命令运行该程序-
$ hadoop org.theitroad.ExampleParquetWriter 18/06/06 12:15:35 INFO compress.CodecPool: Got brand-new compressor [.snappy] 18/06/06 12:15:35 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 2048
Java程序读取parquet文件
要使用上述程序读取在HDFS中创建的Parquet文件,可以使用以下方法。
private static void readParquetFile() {
ParquetReader reader = null;
Path path = new Path("/user/out/data.parquet");
try {
reader = AvroParquetReader
.builder(path)
.withConf(new Configuration())
.build();
GenericData.Record record;
while ((record = reader.read()) != null) {
System.out.println(record);
}
}catch(IOException e) {
e.printStackTrace();
}finally {
if(reader != null) {
try {
reader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
$ hadoop org.theitroad.ExampleParquetWriter
18/06/06 13:33:47 INFO hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 10 records.
18/06/06 13:33:47 INFO hadoop.InternalParquetRecordReader: at row 0. reading next block
18/06/06 13:33:47 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
18/06/06 13:33:47 INFO hadoop.InternalParquetRecordReader: block read in memory in 44 ms. row count = 10
{"id": 1, "empName": "1a"}
{"id": 2, "empName": "2a"}
{"id": 3, "empName": "3a"}
{"id": 4, "empName": "4a"}
{"id": 5, "empName": "5a"}
{"id": 6, "empName": "6a"}
{"id": 7, "empName": "7a"}
{"id": 8, "empName": "8a"}
{"id": 9, "empName": "9a"}
{"id": 10, "empName": "10a"}
请注意,不建议使用org.apache.hadoop.fs.Path实例作为参数的构建器。
我们也可以使用parquet-tools jar查看parquet文件的内容或者架构。
下载parquet-tools-1.10.0.jar以查看文件的内容后,可以使用以下命令。
$ hadoop jar /path/to/parquet-tools-1.10.0.jar cat /user/out/data.parquet
查看Parquet文件的架构。
$ hadoop jar /path/to/parquet-tools-1.10.0.jar schema /user/out/data.parquet
message testFile {
required int32 id;
required binary empName (UTF8);
}
MapReduce编写Parquet文件
在此示例中,使用MapReduce将文本文件转换为Parquet文件。它是仅映射器的工作,因此减速器的数量设置为零。
对于此程序,使用仅两行的简单文本文件(存储在HDFS中)。
This is a test file. This is a Hadoop MapReduce program file.
MapReduce Java代码
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.example.data.Group;
public class ParquetFile extends Configured implements Tool{
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new ParquetFile(), args);
System.exit(exitFlag);
}
/// Schema
private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(
"{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"testFile\",\n" +
" \"doc\": \"test records\",\n" +
" \"fields\":\n" +
" [\n" +
" {\"name\": \"byteofffset\", \"type\": \"long\"},\n"+
" {\"name\": \"line\", \"type\": \"string\"}\n"+
" ]\n"+
"}\n");
// Map function
public static class ParquetMapper extends Mapper<LongWritable, Text, Void, GenericRecord> {
private GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
record.put("byteofffset", key.get());
record.put("line", value.toString());
context.write(null, record);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "parquet");
job.setJarByClass(ParquetFile.class);
job.setMapperClass(ParquetMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Void.class);
job.setOutputValueClass(Group.class);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
// setting schema to be used
AvroParquetOutputFormat.setSchema(job, AVRO_SCHEMA);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
运行MapReduce程序
hadoop jar /path/to/jar org.theitroad.ParquetFile /user/input/count /user/out/parquetFile
使用Parquet工具,我们可以查看Parquet文件的内容。
hadoop jar /path/to/parquet-tools-1.10.0.jar cat /user/out/parquetFile/part-m-00000.parquet 18/06/06 17:15:04 INFO hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 2 records. 18/06/06 17:15:04 INFO hadoop.InternalParquetRecordReader: at row 0. reading next block 18/06/06 17:15:04 INFO hadoop.InternalParquetRecordReader: block read in memory in 20 ms. row count = 2 byteofffset = 0 line = This is a test file. byteofffset = 21 line = This is a Hadoop MapReduce program file.
MapReduce读取Parquet文件
本示例说明如何使用MapReduce读取Parquet文件。该示例读取在上一个示例中编写的Parquet文件,并将其放入文件中。
Parquet文件中的记录如下所示。
byteofffset: 0 line: This is a test file. byteofffset: 21 line: This is a Hadoop MapReduce program file.
由于在输出文件中仅需要行部分,因此我们首先需要拆分记录,然后再次拆分line列的值。
MapReduce Java代码
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.ExampleInputFormat;
public class ParquetFileRead extends Configured implements Tool{
public static void main(String[] args) throws Exception{
int exitFlag = ToolRunner.run(new ParquetFileRead(), args);
System.exit(exitFlag);
}
// Map function
public static class ParquetMapper1 extends Mapper<LongWritable, Group, NullWritable, Text> {
public static final Log log = LogFactory.getLog(ParquetMapper1.class);
public void map(LongWritable key, Group value, Context context)
throws IOException, InterruptedException {
NullWritable outKey = NullWritable.get();
String line = value.toString();
String[] fields = line.split("\n");
String[] record = fields[1].split(": ");
context.write(outKey, new Text(record[1]));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "parquet1");
job.setJarByClass(getClass());
job.setMapperClass(ParquetMapper1.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(ExampleInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
运行MapReduce程序
hadoop jar /path/to/jar org.theitroad.ParquetFileRead /user/out/parquetFile/part-m-00000.parquet /user/out/data
档案内容
$ hdfs dfs -cat /user/out/data/part-m-00000 This is a test file. This is a Hadoop MapReduce program file.

