scala 如何为具有相同大小分区的 Spark RDD 定义自定义分区器,其中每个分区具有相同数量的元素?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23127329/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:11:44  来源:igfitidea点击:

How to Define Custom partitioner for Spark RDDs of equally sized partition where each partition has equal number of elements?

scalahadoopapache-spark

提问by yh18190

I am new to Spark. I have a large dataset of elements[RDD] and I want to divide it into two exactly equal sized partitions maintaining order of elements. I tried using RangePartitionerlike

我是 Spark 的新手。我有一个很大的元素数据集[RDD],我想把它分成两个大小完全相同的分区,保持元素的顺序。我试过使用RangePartitioner

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))

This doesn't give a satisfactory result because it divides roughly but not exactly equal sized maintaining order of elements. For example if there are 64 elements, we use Rangepartitioner, then it divides into 31 elements and 33 elements.

这不会给出令人满意的结果,因为它粗略地但不完全相等地划分了元素的维护顺序。例如如果有64个元素,我们使用 Rangepartitioner,那么它分为31个元素和33个元素。

I need a partitioner such that I get exactly first 32 elements in one half and other half contains second set of 32 elements. Could you please help me by suggesting how to use a customized partitioner such that I get equally sized two halves, maintaining the order of elements?

我需要一个分区器,以便我在一半中获得前 32 个元素,而另一半包含第二组 32 个元素。您能否通过建议如何使用自定义分区器来帮助我,以便我获得相同大小的两半,保持元素的顺序?

回答by Daniel Darabos

Partitioners work by assigning a key to a partition. You would need prior knowledge of the key distribution, or look at all keys, to make such a partitioner. This is why Spark does not provide you with one.

Partitioner通过为分区分配一个键来工作。您需要先了解密钥分布,或者查看所有密钥,才能制作这样的分区器。这就是 Spark 不为您提供的原因。

In general you do not need such a partitioner. In fact I cannot come up with a use case where I would need equal-size partitions. What if the number of elements is odd?

一般来说,您不需要这样的分区器。事实上,我无法想出一个需要相同大小分区的用例。如果元素的数量是奇数怎么办?

Anyway, let us say you have an RDD keyed by sequential Ints, and you know how many in total. Then you could write a custom Partitionerlike this:

无论如何,假设您有一个由序列Ints键控的 RDD ,并且您知道总共有多少个。然后你可以写一个Partitioner这样的自定义:

class ExactPartitioner[V](
    partitions: Int,
    elements: Int)
  extends Partitioner {

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[Int]
    // `k` is assumed to go continuously from 0 to elements-1.
    return k * partitions / elements
  }
}

回答by samthebest

This answer has some inspiration from Daniel, but provides a full implementation (using pimp my library pattern) with an example for peoples copy and paste needs :)

这个答案从 Daniel 那里得到了一些启发,但提供了一个完整的实现(使用pimp my library 模式),并提供了一个例子来满足人们的复制和粘贴需求:)

import RDDConversions._

trait RDDWrapper[T] {
  def rdd: RDD[T]
}

// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
  rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
  // Here we use a single Long to try to ensure the sort is balanced, 
  // but for really large dataset, we may want to consider
  // using a tuple of many Longs or even a GUID
  def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
    .grouped(numPartitions).map(t => (t._1._1, t._2))
}

case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
  def grouped(size: Int): RDD[T] = {
    // TODO Version where withIndex is cached
    val withIndex = rdd.mapPartitions(_.zipWithIndex)

    val startValues =
      withIndex.mapPartitionsWithIndex((i, iter) => 
        Iterator((i, iter.toIterable.last))).toArray().toList
      .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
      case (value, index) => (startValues(i) + index.toLong, value)
    })
    .partitionBy(new Partitioner {
      def numPartitions: Int = size
      def getPartition(key: Any): Int = 
        (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
    })
    .map(_._2)
  }
}

Then in another file we have

然后在另一个文件中我们有

// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
  implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
    new RichRDD[T](rdd)
  implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
  implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}

Then for your use case you just want (assuming it's already sorted)

然后对于您只想要的用例(假设它已经排序)

import RDDConversions._

yourRdd.grouped(2)

Disclaimer: Not tested, kinda just wrote this straight into the SO answer

免责声明:未经测试,只是将其直接写到 SO 答案中