scala Spark RDD 默认分区数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44222307/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark RDD default number of partitions
提问by Sri
Version: Spark 1.6.2, Scala 2.10
版本:Spark 1.6.2,Scala 2.10
I am executing below commands In the spark-shell.
I am trying to see the number of partitions that Spark is creating by default.
我在spark-shell. 我正在尝试查看 Spark 默认创建的分区数。
val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4
//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
As per the Apache Spark documentation, the spark.default.parallelismis the number of cores in my laptop (which is 2 core processor).
根据 Apache Spark文档,这spark.default.parallelism是我的笔记本电脑(即 2 核处理器)中的内核数。
My question is : rdd2seem to be giving the correct result of 2 partitions as said in the documentation. But why rdd1is giving the result as 4 partitions ?
我的问题是:rdd2似乎给出了文档中所说的 2 个分区的正确结果。但是为什么rdd1将结果作为 4 个分区?
回答by eliasah
The minimum number of partitions is actually a lower bound set by the SparkContext. Since sparkuses hadoopunder the hood, Hadoop InputFormat`will still be the behaviour by default.
最小分区数实际上是SparkContext. 由于spark在底层使用了hadoop,因此默认情况下Hadoop InputFormat`仍将是行为。
The first case should reflect defaultParallelismas mentioned herewhich may differ, depending on settings and hardware. (Numbers of cores, etc.)
第一种情况应反映此处defaultParallelism提到的情况,这可能会有所不同,具体取决于设置和硬件。(核心数等)
So unless you provide the number of slices, that first casewould be defined by the number described by sc.defaultParallelism:
因此,除非您提供切片的数量,否则第一种情况将由以下描述的数量定义sc.defaultParallelism:
scala> sc.defaultParallelism
res0: Int = 6
scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6
As for the second case, with sc.textFile, the number of slices by default is the minimum number of partitions.
至于第二种情况,with sc.textFile,slice 的数量默认是最小的 partition 数。
Which is equal to 2as you can see in this section of code.
正如您在此代码段中看到的那样,它等于 2。
Thus, you should consider the following :
因此,您应该考虑以下几点:
sc.parallelizewill takenumSlicesordefaultParallelism.sc.textFilewill take the maximum betweenminPartitionsand the number of splits computed based on hadoop input split size divided by the block size.sc.textFilecallssc.hadoopFile, which creates aHadoopRDDthat usesInputFormat.getSplitsunder the hood [Ref. InputFormat documentation].InputSplit[] getSplits(JobConf job, int numSplits) throws IOException: Logically split the set of input files for the job. Each InputSplit is then assigned to an individual Mapper for processing. Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be tuple. Parameters: job - job configuration. numSplits - the desired number of splits, a hint. Returns: an array of InputSplits for the job. Throws: IOException.
sc.parallelize将采取numSlices或defaultParallelism。sc.textFile将采用minPartitions基于 hadoop 输入拆分大小除以块大小计算的拆分数量之间的最大值。sc.textFilecallsc.hadoopFile,它创建了一个在引擎盖下HadoopRDD使用InputFormat.getSplits的 [Ref. 输入格式文档]。InputSplit[] getSplits(JobConf job, int numSplits) throws IOException:在逻辑上拆分作业的输入文件集。然后将每个 InputSplit 分配给单独的 Mapper 进行处理。 注意:分割是输入的逻辑分割,输入文件没有物理分割成块。例如,拆分可以是元组。参数:job - 作业配置。numSplits - 所需的分割数,一个提示。返回: 作业的 InputSplits 数组。抛出:IOException。
Example:
例子:
Let's create some dummy text files:
让我们创建一些虚拟文本文件:
fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt
This will create 2 files, respectively, of size 241MB and 4GB.
这将创建 2 个文件,分别为 241MB 和 4GB。
We can see what happens when we read each of the files:
我们可以看到当我们读取每个文件时会发生什么:
scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> rdd.getNumPartitions
// res0: Int = 8
scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> rdd2.getNumPartitions
// res1: Int = 128
Both of them are actually HadoopRDDs:
他们两个实际上都是HadoopRDDs:
scala> rdd.toDebugString
// res2: String =
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
// | bigfile.txt HadoopRDD[0] at textFile at <console>:27 []
scala> rdd2.toDebugString
// res3: String =
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
// | hugefile.txt HadoopRDD[2] at textFile at <console>:27 []

