scala 如何使用 Spark 的 repartitionAndSortWithinPartitions?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37227286/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use Spark's repartitionAndSortWithinPartitions?
提问by Make42
I am trying to build a minimal working example of repartitionAndSortWithinPartitionsin order to understand the function. I have got so far (not working, the distinct throws the values around so that they get out of order)
我正在尝试构建一个最小的工作示例repartitionAndSortWithinPartitions以了解该功能。我到目前为止(不工作,不同的抛出值,使它们乱序)
def partval(partID:Int, iter: Iterator[Int]): Iterator[Tuple2[Int, Int]] = {
iter.map( x => (partID, x)).toList.iterator
}
val part20to3_chaos = sc.parallelize(1 to 20, 3).distinct
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
but get the error
但得到错误
Name: Compile Error
Message: <console>:22: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Int]
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(2)
I tried using the scaladoc, but wasn't able to find which class provides repartitionAndSortWithinPartitions. (Btw: This scaladoc is not impressive: Why is MapPartitionsRDDmissing? How can I search for a method?)
我尝试使用scaladoc,但无法找到哪个类提供repartitionAndSortWithinPartitions. (顺便说一句:这个 scaladoc 并不令人印象深刻:为什么会MapPartitionsRDD丢失?我如何搜索方法?)
Realising I need a partitioner object, next I tried to
意识到我需要一个分区对象,接下来我尝试
val rangePartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
val part20to2_sorted = part20to3_chaos.repartitionAndSortWithinPartitions(rangePartitioner)
part20to2_sorted.mapPartitionsWithIndex(partval).collect
but got
但得到
Name: Compile Error
Message: <console>:22: error: type mismatch;
found : org.apache.spark.rdd.RDD[Int]
required: org.apache.spark.rdd.RDD[_ <: Product2[?,?]]
Error occurred in an application involving default arguments.
val rPartitioner = new org.apache.spark.RangePartitioner(2, part20to3_chaos)
How do I get this to compile? Could I get a working example, please?
我如何得到它来编译?请问我可以得到一个有效的例子吗?
采纳答案by Yuval Itzchakov
Your problem is that part20to3_chaosis an RDD[Int], while OrderedRDDFunctions.repartitionAndSortWithinPartitionsis a method which operates on an RDD[(K, V)], where Kis the key and Vis the value.
你的问题是,part20to3_chaos是的RDD[Int],虽然OrderedRDDFunctions.repartitionAndSortWithinPartitions是所操作的方法RDD[(K, V)],其中K是关键,V是价值。
repartitionAndSortWithinPartitionswill first repartitionthe data based on the provided partitioner, and then sort by the key:
repartitionAndSortWithinPartitions将首先根据提供的分区器重新分区数据,然后按键排序:
/**
* Repartition the RDD according to the given partitioner and,
* within each resulting partition, sort records by their keys.
*
* This is more efficient than calling `repartition` and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] =
self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
So it looks like it's not exactly what you're looking for.
所以看起来这不是你要找的。
If you want a plain old sort, you can use sortBy, as it doesn't require a key:
如果你想要一个普通的旧排序,你可以使用sortBy,因为它不需要键:
scala> val toTwenty = sc.parallelize(1 to 20, 3).distinct
toTwenty: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:33
scala> val sorted = toTwenty.sortBy(identity, true, 3).collect
sorted: Array[Int] =
Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Where you pass sortBythe order (ascending or descending), and the number of partitions you want to create.
传递sortBy顺序(升序或降序)的位置,以及要创建的分区数。
回答by Somum
Let me try to explain repartitionAndSortWithinPartitions thru pyspark.
让我尝试通过 pyspark 解释 repartitionAndSortWithinPartitions。
Suppose you have a dataset in pair form
假设你有一个成对形式的数据集
pairs = sc.parallelize([["a",1], ["b",2], ["c",3], ["d",3]])
pairs.collect()
# Output [['a', 1], ['b', 2], ['c', 3], ['d', 3]]
pairs.repartitionAndSortWithinPartitions(2).glom().collect()
# Output [[('a', 1), ('c', 3)], [('b', 2), ('d', 3)]]
Thru repartitionAndSortWithinPartitions() we asked the data to be reshuffled in 2 partitions and that's exactly what we get. 'a' and 'c' as one 'b' and 'd' as another one. The keys are sorted.
通过 repartitionAndSortWithinPartitions() 我们要求数据在 2 个分区中重新排列,这正是我们得到的。'a' 和 'c' 作为一个 'b' 和 'd' 作为另一个。键已排序。
We can also repartition-n-sort based on certain condition, as
我们也可以根据特定条件重新分区排序,如
pairs.repartitionAndSortWithinPartitions(2,
partitionFunc=lambda x: x == 'a').glom().collect()
# Output [[('b', 2), ('c', 3), ('d', 3)], [('a', 1)]]
As expected we have two partitions one with 3 key-pairs sorted and one with ('a',1). To know more about glom refer to this link
正如预期的那样,我们有两个分区,一个分区有 3 个已排序的密钥对,另一个分区是 ('a',1)。要了解有关 glom 的更多信息,请参阅此链接

