scala HashPartitioner 是如何工作的?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31424396/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How does HashPartitioner work?
提问by Sohaib
I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitionerpartitions the distributed set based on the hash of the keys. For example if my data is like
我阅读了HashPartitioner. 不幸的是,除了 API 调用之外,没有任何解释。我假设HashPartitioner根据键的散列对分布式集进行分区。例如,如果我的数据是这样的
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument
因此,分区器会将其放入不同的分区中,相同的键位于同一分区中。但是我不明白构造函数参数的意义
new HashPartitoner(numPartitions) //What does numPartitions do?
For the above dataset how would the results differ if I did
对于上述数据集,如果我这样做,结果会有什么不同
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
So how does HashPartitionerwork actually?
那么HashPartitioner实际效果如何呢?
回答by zero323
Well, lets make your dataset marginally more interesting:
好吧,让你的数据集稍微有趣一点:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
We have six elements:
我们有六个要素:
rdd.count
Long = 6
no partitioner:
没有分区器:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
and eight partitions:
和八个分区:
rdd.partitions.length
Int = 8
Now lets define small helper to count number of elements per partition:
现在让我们定义小助手来计算每个分区的元素数:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):
由于我们没有分区器,我们的数据集在分区之间均匀分布(Spark 中的默认分区方案):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Now lets repartition our dataset:
现在让我们重新分区我们的数据集:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Since parameter passed to HashPartitionerdefines number of partitions we have expect one partition:
由于传递给的参数HashPartitioner定义了分区数,我们期望一个分区:
rddOneP.partitions.length
Int = 1
Since we have only one partition it contains all elements:
由于我们只有一个分区,它包含所有元素:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Note that the order of values after the shuffle is non-deterministic.
请注意,shuffle 之后的值顺序是不确定的。
Same way if we use HashPartitioner(2)
如果我们使用相同的方式 HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
we'll get 2 partitions:
我们将得到 2 个分区:
rddTwoP.partitions.length
Int = 2
Since rddis partitioned by key data won't be distributed uniformly anymore:
由于rdd按关键数据分区将不再均匀分布:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Because with have three keys and only two different values of hashCodemod numPartitionsthere is nothing unexpected here:
因为有三个键并且只有两个不同的hashCodemod值,numPartitions所以这里没有什么出乎意料的:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Just to confirm the above:
只是为了确认以上:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Finally with HashPartitioner(7)we get seven partitions, three non-empty with 2 elements each:
最后,HashPartitioner(7)我们得到了七个分区,三个非空分区,每个分区有 2 个元素:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Summary and Notes
总结和注释
HashPartitionertakes a single argument which defines number of partitionsvalues are assigned to partitions using
hashof keys.hashfunction may differ depending on the language (Scala RDD may usehashCode,DataSetsuse MurmurHash 3, PySpark,portable_hash).In simple case like this, where key is a small integer, you can assume that
hashis an identity (i = hash(i)).Scala API uses
nonNegativeModto determine partition based on computed hash,if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKeyto read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.
In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.
Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).
HashPartitioner(or any otherPartitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.
HashPartitioner接受一个定义分区数量的参数使用
hash键将值分配给分区。hash函数可能因语言而异(Scala RDD 可能使用hashCode、DataSets使用 MurmurHash 3、PySpark、portable_hash)。在像这样的简单情况下,其中 key 是一个小整数,您可以假设它
hash是一个身份 (i = hash(i))。Scala API 用于
nonNegativeMod根据计算的散列确定分区,如果密钥的分布不统一,您可能会遇到集群的一部分空闲的情况
键必须是可散列的。您可以查看我对A list 的回答作为 PySpark 的 reduceByKey 的关键,以阅读有关 PySpark 特定问题的信息。HashPartitioner 文档强调了另一个可能的问题:
Java 数组的哈希码基于数组的身份而不是其内容,因此尝试使用 HashPartitioner对 RDD[Array[ ]] 或 RDD[(Array[], _)] 进行分区将产生意外或不正确的结果。
在 Python 3 中,您必须确保散列是一致的。请参阅什么是异常:在 pyspark 中应通过 PYTHONHASHSEED 禁用字符串散列的随机性意味着什么?
哈希分区器既不是单射也不是满射。可以将多个键分配给单个分区,并且某些分区可以保持为空。
请注意,当前基于哈希的方法在与 REPL 定义的案例类(Apache Spark 中的 Case 类相等)结合时在 Scala 中不起作用。
HashPartitioner(或任何其他Partitioner)对数据进行洗牌。除非在多个操作之间重用分区,否则它不会减少要混洗的数据量。
回答by abalcerek
RDDis distributed this means it is split on some number of parts. Each of this partitions is potentially on different machine. Hash partitioner with argument numPartitionschooses on what partition to place pair (key, value)in following way:
RDD是分布式的,这意味着它被分成若干部分。每个分区都可能位于不同的机器上。带参数的哈希分区器以下列方式numPartitions选择放置对的分区(key, value):
- Creates exactly
numPartitionspartitions. - Places
(key, value)in partition with numberHash(key) % numPartitions
- 创建完全
numPartitions分区。 - 地方
(key, value)与数分区Hash(key) % numPartitions
回答by Daniel Darabos
The HashPartitioner.getPartitionmethod takes a keyas its argument and returns the indexof the partition which the key belongs to. The partitioner has to know what the valid indices are, so it returns numbers in the right range. The number of partitions is specified through the numPartitionsconstructor argument.
该HashPartitioner.getPartition方法将一个键作为其参数,并返回该键所属分区的索引。分区器必须知道有效索引是什么,因此它返回正确范围内的数字。分区数通过numPartitions构造函数参数指定。
The implementation returns roughly key.hashCode() % numPartitions. See Partitioner.scalafor more details.
实现大致返回key.hashCode() % numPartitions. 有关更多详细信息,请参阅Partitioner.scala。


