scala 如何在 Spark 中将 Parquet 文件拆分为多个分区?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27194333/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to split parquet files into many partitions in Spark?
提问by samthebest
So I have just 1 parquet file I'm reading with Spark (using the SQL stuff) and I'd like it to be processed with 100 partitions. I've tried setting spark.default.parallelismto 100, we have also tried changing the compression of the parquet to none (from gzip). No matter what we do the first stage of the spark job only has a single partition (once a shuffle occurs it gets repartitioned into 100 and thereafter obviously things are much much faster).
所以我只有 1 个正在使用 Spark 读取的镶木地板文件(使用 SQL 的东西),我希望用 100 个分区处理它。我试过设置spark.default.parallelism为 100,我们也试过将镶木地板的压缩更改为无(来自 gzip)。无论我们做什么,spark 作业的第一阶段都只有一个分区(一旦发生 shuffle,它会被重新分区为 100,此后显然事情要快得多)。
Now according to a few sources (like below) parquet should be splittable (even if using gzip!), so I'm super confused and would love some advice.
现在根据一些来源(如下所示),镶木地板应该是可拆分的(即使使用 gzip!),所以我非常困惑,希望得到一些建议。
I'm using spark 1.0.0, and apparently the default value for spark.sql.shuffle.partitionsis 200, so it can't be that. In fact all the defaults for parallelism are much more than 1, so I don't understand what's going on.
我使用的是 spark 1.0.0,显然默认值为spark.sql.shuffle.partitions200,所以不可能是这样。事实上,并行性的所有默认值都远远超过 1,所以我不明白发生了什么。
采纳答案by C4stor
You should write your parquet files with a smaller block size. Default is 128Mb per block, but it's configurable by setting parquet.block.sizeconfiguration in the writer.
您应该使用较小的块大小编写镶木地板文件。默认为每块 128Mb,但可以通过parquet.block.size在编写器中设置配置来配置。
The source of ParquetOuputFormat is here, if you want to dig into details.
ParquetOuputFormat 的来源是这里,如果你想深入了解细节。
The block size is minimum amount of data you can read out of a parquet file which is logically readable (since parquet is columnar, you can't just split by line or something trivial like this), so you can't have more reading threads than input blocks.
块大小是您可以从逻辑上可读的镶木地板文件中读取的最小数据量(由于镶木地板是柱状的,您不能只是按行或这样的小东西分割),因此您不能有更多的阅读线程比输入块。
回答by Ruslan Pelin
You have mentioned that you want to control distribution during write to parquet. When you create parquet from RDDs parquet preserves partitions of the RDD. So, if you create RDD and specify 100 partitions and from dataframe with parquet format then it will be writing 100 separate parquet files to fs.
For read you could specify spark.sql.shuffle.partitionsparameter.
您已经提到要在写入镶木地板期间控制分布。当您从 RDD 创建 parquet 时,parquet 会保留 RDD 的分区。因此,如果您创建 RDD 并指定 100 个分区和来自具有镶木地板格式的数据帧,那么它将向 fs 写入 100 个单独的镶木地板文件。对于读取,您可以指定spark.sql.shuffle.partitions参数。
回答by suztomo
Maybe your parquet file only takes one HDFS block. Create a big parquet file that has many HDFS blocks and load it
也许您的镶木地板文件只需要一个 HDFS 块。创建一个包含许多 HDFS 块的大型镶木地板文件并加载它
val k = sc.parquetFile("the-big-table.parquet")
k.partitions.length
You'll see same number of partitions as HDFS blocks. This worked fine for me (spark-1.1.0)
您将看到与 HDFS 块相同数量的分区。这对我来说很好(spark-1.1.0)
回答by F Pereira
The new way of doing it (Spark 2.x) is setting
这样做的新方法(Spark 2.x)是设置
spark.sql.files.maxPartitionBytes
Source: https://issues.apache.org/jira/browse/SPARK-17998(the official documentation is not correct yet, misses the .sql)
来源:https: //issues.apache.org/jira/browse/SPARK-17998(官方文档还没有正确,漏掉了.sql)
From my experience, Hadoop settings no longer have effect.
根据我的经验,Hadoop 设置不再有效。
回答by Prokod
To achieve that you should use SparkContextto set Hadoop configuration (sc.hadoopConfiguration) property mapreduce.input.fileinputformat.split.maxsize.
要实现这一点,您应该使用SparkContext设置 Hadoop 配置 ( sc.hadoopConfiguration) 属性mapreduce.input.fileinputformat.split.maxsize。
By setting this property to a lower value than hdfs.blockSize, than you will get as much partitions as the number of splits.
通过将此属性设置为比 hdfs.blockSize 更低的值,您将获得与拆分数量一样多的分区。
For example:
When hdfs.blockSize= 134217728 (128MB),
and one file is read which contains exactly one full block,
and mapreduce.input.fileinputformat.split.maxsize= 67108864 (64MB)
例如:
当hdfs.blockSize= 134217728 (128MB),
并且读取一个包含一个完整块的文件,
并且mapreduce.input.fileinputformat.split.maxsize= 67108864 (64MB)
Then there will be two partitions those splits will be read into.
然后将有两个分区,这些分区将被读入。

![scala Spark:从 RDD[X] 产生所有可能组合的 RDD[(X, X)]](/res/img/loading.gif)