java 如何将 .txt / .csv 文件转换为 ORC 格式
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25117760/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to convert .txt / .csv file to ORC format
提问by Soubhagya Hota
For some requirement I want to convert text file(delimited)to ORC(Optimized Row Columnar)format. As I have to run it in regular intervals, I want to write a java programto do so. I don't want to use Hive temporary table workaround. Can anyone please help me do it? Below is what i tried
对于某些要求,我想将文本文件(分隔)转换为ORC(优化行列式)格式。由于我必须定期运行它,因此我想编写一个Java 程序来执行此操作。我不想使用 Hive 临时表解决方法。任何人都可以帮我做吗?以下是我尝试过的
/*ORCMapper.java*/
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.io.*;
public class ORCMapper extends MapReduceBase implements
Mapper<LongWritable, Text, NullWritable, Writable>{
OrcSerde serde;
@Override
public void configure(JobConf job) {
serde = new OrcSerde();
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<NullWritable, Writable> output, Reporter reporter)
throws IOException {
output.collect(NullWritable.get(),serde.serialize(value, null));
}
}
/*ORCReducer.java*/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class ORCReducer extends MapReduceBase implements Reducer<NullWritable, Writable, NullWritable, Writable>{
@Override
public void reduce(NullWritable key, Iterator<Writable> values,
OutputCollector<NullWritable, Writable> output, Reporter reporter)
throws IOException {
Writable value = values.next();
output.collect(key, value);
}
}
/*ORCDriver.java*/
import java.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class ORCDriver {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
JobClient client = new JobClient();
JobConf conf = new JobConf("ORC_Generator");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Writable.class);
conf.setOutputFormat(OrcOutputFormat.class);
FileInputFormat.addInputPath(conf, new Path("hdfs://localhost:9000/path/to/ipdir/textfile"));
OrcOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/path/to/opdir/orcfile"));
conf.setMapperClass(ORCMapper.class);
System.out.println(OrcOutputFormat.getWorkOutputPath(conf));
conf.setNumReduceTasks(0);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Running this shows below error and a file named part-00000is generated in my local
运行此显示以下错误,并在我的本地生成名为part-00000的文件
java.io.IOException: File already exists:part-00000
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:249)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:241)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:335)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:381)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:364)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:564)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:545)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.ensureWriter(WriterImpl.java:1672)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.flushStripe(WriterImpl.java:1688)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:1868)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:95)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:80)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:833)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1763)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:439)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/02 11:23:26 INFO mapred.LocalJobRunner: Map task executor complete.
14/09/02 11:23:26 WARN mapred.LocalJobRunner: job_local688970064_0001
java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.createTreeWriter(WriterImpl.java:1515)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:154)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:258)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:63)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:46)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:847)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:591)
at ORCMapper.map(ORCMapper.java:42)
at ORCMapper.map(ORCMapper.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/02 11:23:26 INFO mapred.JobClient: map 0% reduce 0%
14/09/02 11:23:26 INFO mapred.JobClient: Job complete: job_local688970064_0001
14/09/02 11:23:26 INFO mapred.JobClient: Counters: 0
14/09/02 11:23:26 INFO mapred.JobClient: Job Failed: NA
java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1357)
at ORCDriver.main(ORCDriver.java:53)
采纳答案by abstractKarshit
You can use Spark dataframes to convert a delimited file to orc format very easily. You can also specify/impose a schema and filter specific columns as well.
您可以使用 Spark 数据帧非常轻松地将分隔文件转换为 orc 格式。您还可以指定/强加架构并过滤特定的列。
public class OrcConvert {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("OrcConvert");
JavaSparkContext jsc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(jsc);
String inputPath = args[0];
String outputPath = args[1];
DataFrame inputDf = hiveContext.read().format("com.databricks.spark.csv")
.option("quote", "'").option("delimiter", "insert overwrite table orcTable select * from textTable;
1")
.load(inputPath);
inputDf.write().orc(outputPath);
}
}
Make sure all dependencies are met, a hive should be running to use HiveContext also, currently in Spark ORC format is only supported in HiveContext.
确保满足所有依赖项,hive 也应该运行以使用 HiveContext,目前 Spark ORC 格式仅在 HiveContext 中受支持。
回答by winningsix
You can insert text data into a orc table by such command:
您可以通过以下命令将文本数据插入到 orc 表中:
create table orcTable(name string, city string) stored as orc;
The first table is orcTable is created by the following command:
第一个表是 orcTable 是通过以下命令创建的:
##代码##And the textTable is as the same structure as orcTable.
textTable 与 orcTable 的结构相同。