如何在 Scala Spark 中对 RDD 进行排序?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23838614/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:16:40  来源:igfitidea点击:

How to sort an RDD in Scala Spark?

scalaapache-sparkrdd

提问by blue-sky

Reading Spark method sortByKey :

读取 Spark 方法 sortByKey :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

Is it possible to return just "N" amount of results. So instead of returning all results, just return the top 10. I could convert the sorted collection to an Array and use takemethod but since this is an O(N) operation is there a more efficient method ?

是否可以只返回“N”个结果。因此,不是返回所有结果,而是返回前 10 个结果。我可以将排序后的集合转换为数组并使用take方法,但由于这是一个 O(N) 操作,是否有更有效的方法?

采纳答案by javadba

Most likely you have already perused the source code:

您很可能已经阅读过源代码:

  class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions, self, ascending)
    val shuffled = new ShuffledRDD[K, V, P](self, part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x, y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x, y) => x._1 > y._1).iterator
      }
    }, preservesPartitioning = true)
  }

And, as you say, the entiredata must go through the shuffle stage - as seen in the snippet.

而且,正如您所说,整个数据必须经过 shuffle 阶段 - 如代码段所示。

However, your concern about subsequently invoking take(K) may not be so accurate. This operation does NOT cycle through all N items:

但是,您对随后调用 take(K) 的担忧可能并不那么准确。此操作不会循环遍历所有 N 项:

  /**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

So then, it would seem:

那么,看起来:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k))(至少对于小 K) << O(myRdd.sortByKey().collect ()

回答by Daniel Darabos

If you only need the top 10, use rdd.top(10). It avoids sorting, so it is faster.

如果您只需要前 10 个,请使用rdd.top(10). 它避免了排序,因此速度更快。

rdd.topmakes one parallel pass through the data, collecting the top N in each partition in a heap, then merges the heaps. It isan O(rdd.count)operation. Sorting would be O(rdd.count log rdd.count), and incur a lot of data transfer —?it does a shuffle, so all of the data would be transmitted over the network.

rdd.top对数据进行一次并行遍历,收集堆中每个分区的前 N ​​个,然后合并堆。这一个O(rdd.count)操作。排序将是O(rdd.count log rdd.count),并且会产生大量数据传输——它会进行 shuffle,因此所有数据都将通过网络传输。

回答by jruizaranguren

Another option, at least from PySpark 1.2.0, is the use of takeOrdered.

至少从 PySpark 1.2.0 开始,另一个选择是使用takeOrdered

In ascending order:

按升序排列:

rdd.takeOrdered(10)

In descending order:

按降序排列:

rdd.takeOrdered(10, lambda x: -x)

Top k values for k,v pairs:

k,v 对的前 k 个值:

rdd.takeOrdered(10, lambda (k, v): -v)