scala 如何在不重新分区和 copyMerge 的情况下合并 Spark 结果文件?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29025147/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I merge spark results files without repartition and copyMerge?
提问by Leonard
I use the next code:
我使用下一个代码:
csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
pathToResults directory has many files like part-0000, part-0001 etc. I can use FileUtil.copyMerge(), but it's really slow, it's download all files on driver program and then upload them in hadoop. But FileUtil.copyMerge() faster than:
pathToResults 目录有很多文件,如 part-0000、part-0001 等。我可以使用 FileUtil.copyMerge(),但它真的很慢,它下载驱动程序上的所有文件,然后将它们上传到 hadoop。但是 FileUtil.copyMerge() 比:
csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
How can I merge spark results files without repartition and FileUtil.copyMerge()?
如何在不重新分区和 FileUtil.copyMerge() 的情况下合并 spark 结果文件?
回答by 0x0FFF
Unfortunately, there is not other option to get a single output file in Spark. Instead of repartition(1)you can use coalesce(1), but with parameter 1their behavior would be the same. Spark would collect your data in a single partition in memory which might cause OOM error if your data is too big.
不幸的是,没有其他选项可以在 Spark 中获取单个输出文件。代替repartition(1)您可以使用coalesce(1),但使用参数1它们的行为将是相同的。Spark 会将您的数据收集在内存中的单个分区中,如果您的数据太大,这可能会导致 OOM 错误。
Another option for merging files on HDFS might be to write a simple MapReduce job (or Pig job, or Hadoop Streaming job) that would get the whole directory as an input and using a single reducer generate you a single output file. But be aware that with the MapReduce approach all the data would be first copied to the reducer local filesystem which might cause "out of space" error.
在 HDFS 上合并文件的另一种选择可能是编写一个简单的 MapReduce 作业(或 Pig 作业,或 Hadoop Streaming 作业),它将整个目录作为输入,并使用单个减速器为您生成单个输出文件。但请注意,使用 MapReduce 方法,所有数据将首先复制到减速器本地文件系统,这可能会导致“空间不足”错误。
Here are some useful links on the same topic:
以下是有关同一主题的一些有用链接:
回答by Tagar
Had exactly the same question and had to write pySpark code (with calls to Hadoop API) that implements copyMerge:
有完全相同的问题,必须编写实现 copyMerge 的 pySpark 代码(调用 Hadoop API):
https://github.com/Tagar/stuff/blob/master/copyMerge.py
https://github.com/Tagar/stuff/blob/master/copyMerge.py
Unfortunately copyMerge as a standalone Hadoop API call is going to be deprecated and removed in Hadoop 3.0. So this implementation doesn't depend on Hadoop's copyMerge (it re-implements it).
不幸的是,copyMerge 作为一个独立的 Hadoop API 调用将在 Hadoop 3.0 中被弃用和删除。所以这个实现不依赖于 Hadoop 的 copyMerge(它重新实现了它)。
回答by Jeff A.
coalesce(1) is working just fine. I also see hadoop-streaming option that can merge HDFS files on the fly if you would like to run this script:
合并(1)工作得很好。我还看到了 hadoop-streaming 选项,如果你想运行这个脚本,它可以动态合并 HDFS 文件:
$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
-Dmapred.reduce.tasks=1 \
-input "/hdfs/input/dir" \
-output "/hdfs/output/dir" \
-mapper cat \
-reducer cat

