scala Spark:从 RDD[X] 产生所有可能组合的 RDD[(X, X)]

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26557873/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:38:55  来源:igfitidea点击:

Spark: produce RDD[(X, X)] of all possible combinations from RDD[X]

scalaapache-spark

提问by Eugene Zhulenev

Is it possible in Spark to implement '.combinations' function from scala collections?

Spark 是否可以从 Scala 集合中实现“.combinations”功能?

   /** Iterates over combinations.
   *
   *  @return   An Iterator which traverses the possible n-element combinations of this $coll.
   *  @example  `"abbbc".combinations(2) = Iterator(ab, ac, bb, bc)`
   */

For example how can I get from RDD[X] to RDD[List[X]] or RDD[(X,X)] for combinations of size = 2. And lets assume that all values in RDD are unique.

例如,对于大小 = 2 的组合,我如何从 RDD[X] 到 RDD[List[X]] 或 RDD[(X,X)]。让我们假设 RDD 中的所有值都是唯一的。

回答by aaronman

Cartesian product and combinations are two different things, the cartesian product will create an RDD of size rdd.size() ^ 2and combinations will create an RDD of size rdd.size() choose 2

笛卡尔积和组合是两个不同的东西,笛卡尔积将创建一个大小的 RDD,rdd.size() ^ 2组合将创建一个大小的 RDDrdd.size() choose 2

val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()

Note this will only work if an ordering is defined on the elements of the list, since we use <. This one only works for choosing two but can easily be extended by making sure the relationship a < bfor all a and b in the sequence

请注意,这仅在列表元素上定义了排序时才有效,因为我们使用<. 这仅适用于选择两个,但可以通过确保a < b序列中所有 a 和 b的关系轻松扩展

回答by maasg

This is supported natively by a Spark RDD with the cartesiantransformation.

这由带有cartesian转换的 Spark RDD 原生支持。

e.g.:

例如:

val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect

Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5), 
(2,1), (2,2), (2,3), (2,4), (2,5), 
(3,1), (3,2), (3,3), (3,4), (3,5), 
(4,1), (4,2), (4,3), (4,4), (4,5), 
(5,1), (5,2), (5,3), (5,4), (5,5))

回答by maasg

As discussed, cartesianwill give you n^2 elements of the cartesian product of the RDD with itself. This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (used String as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

正如所讨论的,cartesian将为您提供 RDD 与自身的笛卡尔积的 n^2 个元素。该算法计算 RDD 的组合 (n,2),而不必先计算 n^2 元素:(使用 String 作为类型,泛化到类型 T 需要一些带有类标签的管道,这会掩盖这里的目的)

This is probably less time efficient that cartesian + filtering due to the iterative countand takeactions that forces the computation of the RDD, but more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)elements instead of the n^2of the cartesian product.

由于强制计算 RDD的迭代counttake操作,这可能比笛卡尔 + 过滤的时间效率低,但空间效率更高,因为它只计算C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)元素而不是n^2笛卡尔积的元素。

 import org.apache.spark.rdd._

 def combs(rdd:RDD[String]):RDD[(String,String)] = {
    val count = rdd.count
    if (rdd.count < 2) { 
        sc.makeRDD[(String,String)](Seq.empty)
    } else if (rdd.count == 2) {
        val values = rdd.collect
        sc.makeRDD[(String,String)](Seq((values(0), values(1))))
    } else {
        val elem = rdd.take(1)
        val elemRdd = sc.makeRDD(elem)
        val subtracted = rdd.subtract(elemRdd)  
        val comb = subtracted.map(e  => (elem(0),e))
        comb.union(combs(subtracted))
    } 
 }

回答by Behzad Behzadan

This creates all combinations (n, 2) and works for any RDD without requiring any ordering on the elements of RDD.

这将创建所有组合 (n, 2) 并适用于任何 RDD,而无需对 RDD 的元素进行任何排序。

val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}

a._2 and b._2 are the indices, while a._1 and b._1 are the elements of the original RDD.

a._2 和 b._2 是索引,而 a._1 和 b._1 是原始 RDD 的元素。

Example:

例子:

Note that, no ordering is defined on the maps here.

请注意,此处的地图上未定义排序。

val m1 = Map('a' -> 1, 'b' -> 2)
val m2 = Map('c' -> 3, 'a' -> 4)
val m3 = Map('e' -> 5, 'c' -> 6, 'b' -> 7)
val rdd = sc.makeRDD(Array(m1, m2, m3))
val rddWithIndex = rdd.zipWithIndex
rddWithIndex.cartesian(rddWithIndex).filter{case(a, b) => a._2 < b._2}.map{case(a, b) => (a._1, b._1)}.collect

Output:

输出:

Array((Map(a -> 1, b -> 2),Map(c -> 3, a -> 4)), (Map(a -> 1, b -> 2),Map(e -> 5, c -> 6, b -> 7)), (Map(c -> 3, a -> 4),Map(e -> 5, c -> 6, b -> 7)))