scala 如何使 saveAsTextFile 不将输出拆分为多个文件?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24371259/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
how to make saveAsTextFile NOT split output into multiple file?
提问by user2773013
When using Scala in Spark, whenever I dump the results out using saveAsTextFile, it seems to split the output into multiple parts. I'm just passing a parameter(path) to it.
在 Spark 中使用 Scala 时,每当我使用 转储结果时saveAsTextFile,它似乎将输出分成多个部分。我只是向它传递一个参数(路径)。
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
- Does the number of outputs correspond to the number of reducers it uses?
- Does this mean the output is compressed?
- I know I can combine the output together using bash, but is there an option to store the output in a single text file, without splitting?? I looked at the API docs, but it doesn't say much about this.
- 输出的数量是否与它使用的减速器数量相对应?
- 这是否意味着输出被压缩?
- 我知道我可以使用 bash 将输出组合在一起,但是是否可以选择将输出存储在单个文本文件中而不拆分?我查看了 API 文档,但对此并没有说太多。
回答by aaronman
The reason it saves it as multiple files is because the computation is distributed. If the output is small enough such that you think you can fit it on one machine, then you can end your program with
之所以将其保存为多个文件,是因为计算是分布式的。如果输出足够小,以至于您认为可以将其安装在一台机器上,那么您可以使用以下命令结束程序
val arr = year.collect()
And then save the resulting array as a file, Another way would be to use a custom partitioner, partitionBy, and make it so everything goes to one partition though that isn't advisable because you won't get any parallelization.
然后将生成的数组保存为文件,另一种方法是使用自定义分区器,partitionBy并使所有内容都转到一个分区,尽管这是不可取的,因为您不会获得任何并行化。
If you require the file to be saved with saveAsTextFileyou can use coalesce(1,true).saveAsTextFile(). This basically means do the computation then coalesce to 1 partition. You can also use repartition(1)which is just a wrapper for coalescewith the shuffle argument set to true. Looking through the source of RDD.scalais how I figured most of this stuff out, you should take a look.
如果您需要保存文件,saveAsTextFile可以使用coalesce(1,true).saveAsTextFile(). 这基本上意味着进行计算然后合并到 1 个分区。您还可以使用repartition(1)which 只是coalesce将 shuffle 参数设置为 true的包装器。查看RDD.scala的源代码是我发现大部分内容的方式,你应该看看。
回答by Xavier Guihot
For those working with a larger dataset:
对于那些使用更大数据集的人:
rdd.collect()should not be used in this case as it will collectall data as anArrayin the driver, which is the easiest way to get out of memory.rdd.coalesce(1).saveAsTextFile()should also not be used as the parallelism of upstream stages will be lost to be performed on a single node, where data will be stored from.rdd.coalesce(1, shuffle = true).saveAsTextFile()is the best simple optionas it will keep the processing of upstream tasks parallel and then only perform the shuffle to one node (rdd.repartition(1).saveAsTextFile()is an exact synonym).rdd.saveAsSingleTextFile()as provided bellow additionally allows one to store the rdd in a single file with a specific namewhile keeping the parallelism properties ofrdd.coalesce(1, shuffle = true).saveAsTextFile().
rdd.collect()不应在这种情况下使用,因为它将收集所有数据作为Array驱动程序中的一个,这是内存不足的最简单方法。rdd.coalesce(1).saveAsTextFile()也不应使用,因为上游阶段的并行性将丢失,无法在单个节点上执行,数据将从该节点存储。rdd.coalesce(1, shuffle = true).saveAsTextFile()是最好的简单选项,因为它将保持上游任务的处理并行,然后只对一个节点执行 shuffle(rdd.repartition(1).saveAsTextFile()是一个精确的同义词)。rdd.saveAsSingleTextFile()正如下面提供的那样,还允许将 rdd 存储在具有特定名称的单个文件中,同时保持rdd.coalesce(1, shuffle = true).saveAsTextFile().
Something that can be inconvenient with rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")is that it actually produces a file whose path is path/to/file.txt/part-00000and not path/to/file.txt.
可能不方便的rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")是它实际上生成了一个文件,其路径是path/to/file.txt/part-00000而不是path/to/file.txt.
The following solution rdd.saveAsSingleTextFile("path/to/file.txt")will actually produce a file whose path is path/to/file.txt:
以下解决方案rdd.saveAsSingleTextFile("path/to/file.txt")实际上会生成一个文件,其路径为path/to/file.txt:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
// Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
which can be used this way:
可以这样使用:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
This snippet:
这个片段:
First stores the rdd with
rdd.saveAsTextFile("path/to/file.txt")in a temporary folderpath/to/file.txt.tmpas if we didn't want to store data in one file (which keeps the processing of upstream tasks parallel)And then only, using the hadoop file system api, we proceed with the merge(
FileUtil.copyMerge()) of the different output files to create our final output single filepath/to/file.txt.
首先将 rdd 存储
rdd.saveAsTextFile("path/to/file.txt")在一个临时文件夹中path/to/file.txt.tmp,就好像我们不想将数据存储在一个文件中一样(这使上游任务的处理保持并行)然后,仅使用hadoop 文件系统 api,我们继续合并(
FileUtil.copyMerge()) 不同的输出文件以创建我们最终的输出单个文件path/to/file.txt。
回答by marekinfo
You could call coalesce(1)and then saveAsTextFile()- but it might be a bad idea if you have a lot of data. Separate files per split are generated just like in Hadoop in order to let separate mappers and reducers write to different files. Having a single output file is only a good idea if you have very little data, in which case you could do collect() as well, as @aaronman said.
你可以打电话coalesce(1)然后saveAsTextFile()- 但如果你有很多数据,这可能是一个坏主意。就像在 Hadoop 中一样,每个拆分生成单独的文件,以便让单独的映射器和化简器写入不同的文件。如果您的数据很少,那么只有一个输出文件是一个好主意,在这种情况下,您也可以执行 collect() ,正如@aaronman 所说。
回答by Matt
As others have mentioned, you can collect or coalesce your data set to force Spark to produce a single file. But this also limits the number of Spark tasks that can work on your dataset in parallel. I prefer to let it create a hundred files in the output HDFS directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txtto extract the results into a single file in the local filesystem. This makes the most sense when your output is a relatively small report, of course.
正如其他人提到的,您可以收集或合并您的数据集以强制 Spark 生成单个文件。但这也限制了可以并行处理数据集的 Spark 任务的数量。我更喜欢让它在输出 HDFS 目录中创建一百个文件,然后用于hadoop fs -getmerge /hdfs/dir /local/file.txt将结果提取到本地文件系统中的单个文件中。当然,当您的输出是一个相对较小的报告时,这是最有意义的。
回答by Bhaskar Das
You can call repartition()and follow this way:
您可以通过repartition()以下方式调用并遵循:
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")
回答by gprivitera
You will be able to do it in the next version of Spark, in the current version 1.0.0 it's not possible unless you do it manually somehow, for example, like you mentioned, with a bash script call.
您将能够在下一个版本的 Spark 中执行此操作,在当前版本 1.0.0 中这是不可能的,除非您以某种方式手动执行此操作,例如,就像您提到的,使用 bash 脚本调用。
回答by JavaPlanet
I also want to mention that the documentation clearly states that users should be careful when calling coalesce with a real small number of partitions . this can cause upstream partitions to inherit this number of partitions.
我还想提一下,该文档明确指出用户在使用真正少量的分区调用 coalesce 时应该小心。这可能会导致上游分区继承此数量的分区。
I would not recommend using coalesce(1) unless really required.
除非真的需要,否则我不建议使用coalesce(1)。
回答by Aravind Krishnakumar
In Spark 1.6.1 the format is as shown below. It creates a single output file.It is best practice to use it if the output is small enough to handle.Basically what it does is that it returns a new RDD that is reduced into numPartitions partitions.If you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1)
在 Spark 1.6.1 中,格式如下所示。它创建一个单独的输出文件。如果输出小到可以处理,最好使用它。基本上它的作用是返回一个新的 RDD,该 RDD 被缩减为 numPartitions 个分区。如果你正在做一个剧烈的合并,例如对于 numPartitions = 1,这可能会导致您的计算发生在比您喜欢的更少的节点上(例如,在 numPartitions = 1 的情况下只有一个节点)
pair_result.coalesce(1).saveAsTextFile("/app/data/")
回答by Ian Mendoza
Here's my answer to output a single file. I just added coalesce(1)
这是我输出单个文件的答案。我刚加coalesce(1)
val year = sc.textFile("apat63_99.txt")
.map(_.split(",")(1))
.flatMap(_.split(","))
.map((_,1))
.reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
Code:
代码:
year.coalesce(1).saveAsTextFile("year")


