scala Spark:将 2 元组键 RDD 与单键 RDD 结合的最佳策略是什么?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/17621596/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
提问by RyanH
I have two RDD's that I want to join and they look like this:
我有两个想要加入的 RDD,它们看起来像这样:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
It happens to be the case that the key values of rdd1are unique and also that the tuple-key values of rdd2are unique. I'd like to join the two data sets so that I get the following rdd:
碰巧的情况是 的键值rdd1是唯一的,而且 的元组键值rdd2也是唯一的。我想加入这两个数据集,以便获得以下 rdd:
val rdd_joined:RDD[((T,W), (U,V))]
What's the most efficient way to achieve this? Here are a few ideas I've thought of.
实现这一目标的最有效方法是什么?以下是我想到的一些想法。
Option 1:
选项1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Option 2:
选项 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Option 1 will collect all of the data to master, right? So that doesn't seem like a good option if rdd1 is large (it's relatively large in my case, although an order of magnitude smaller than rdd2). Option 2 does an ugly distinct and cartesian product, which also seems very inefficient. Another possibility that crossed my mind (but haven't tried yet) is to do option 1 and broadcast the map, although it would be better to broadcast in a "smart" way so that the keys of the map are co-located with the keys of rdd2.
选项 1 将收集所有数据以进行掌握,对吗?因此,如果 rdd1 很大(在我的情况下它相对较大,尽管比 rdd2 小一个数量级),这似乎不是一个好的选择。选项 2 做了一个丑陋的独特的笛卡尔积,这似乎也非常低效。我想到的另一种可能性(但尚未尝试)是执行选项 1 并广播地图,尽管以“智能”方式广播会更好,以便地图的键与的键rdd2。
Has anyone come across this sort of situation before? I'd be happy to have your thoughts.
有没有人遇到过这种情况?我很高兴有你的想法。
Thanks!
谢谢!
回答by Josh Rosen
One option is to perform a broadcast join by collecting rdd1to the driver and broadcasting it to all mappers; done correctly, this will let us avoid an expensive shuffle of the large rdd2RDD:
一种选择是通过收集rdd1到驱动程序并将其广播给所有映射器来执行广播连接;如果做得正确,这将使我们避免对大型rdd2RDD进行昂贵的洗牌:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
val m = rdd1Broadcast.value
for {
((t, w), u) <- iter
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)
The preservesPartitioning = truetells Spark that this map function doesn't modify the keys of rdd2; this will allow Spark to avoid re-partitioning rdd2for any subsequent operations that join based on the (t, w)key.
该preservesPartitioning = true告诉星火此映射函数不修改的键rdd2; 这将允许 Spark 避免rdd2为基于(t, w)键加入的任何后续操作重新分区。
This broadcast could be inefficient since it involves a communications bottleneck at the driver. In principle, it's possible to broadcast one RDD to another without involving the driver; I have a prototype of this that I'd like to generalize and add to Spark.
这种广播可能效率低下,因为它涉及驱动程序的通信瓶颈。原则上,可以在不涉及驱动程序的情况下将一个 RDD 广播给另一个;我有一个我想概括并添加到 Spark 的原型。
Another option is to re-map the keys of rdd2and use the Spark joinmethod; this will involve a full shuffle of rdd2(and possibly rdd1):
另一种选择是重新映射键rdd2并使用 Sparkjoin方法;这将涉及rdd2(并且可能rdd1)的全面洗牌:
rdd1.join(rdd2.map {
case ((t, w), u) => (t, (w, u))
}).map {
case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()
On my sample input, both of these methods produce the same result:
在我的示例输入中,这两种方法产生相同的结果:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
A third option would be to restructure rdd2so that tis its key, then perform the above join.
第三种选择是重组rdd2,t使其成为关键,然后执行上述连接。
回答by Roger Hoover
Another way to do it is to create a custom partitioner and then use zipPartitions to join your RDDs.
另一种方法是创建一个自定义分区器,然后使用 zipPartitions 来加入您的 RDD。
import org.apache.spark.HashPartitioner
class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] => super.getPartition(k._1)
case _ => super.getPartition(key)
}
}
val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
val result = rdd2.zipPartitions(rdd1)(
(iter2, iter1) => {
val m = iter1.toMap
for {
((t: Int, w), u) <- iter2
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}
).partitionBy(new HashPartitioner(numSplits))
result.glom.collect

