scala 如何通过键或过滤器()使用带有两个 RDD 的火花交叉点()?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42325496/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:06:14  来源:igfitidea点击:

how to use spark intersection() by key or filter() with two RDD?

scalaapache-sparkfilterrddintersection

提问by S.Kang

I want to use intersection()by key or filter()in spark.

我想intersection()通过键或filter()火花使用。

But I really don't know how to use intersection()by key.

但我真的不知道如何intersection()按键使用。

So I tried to use filter(), but it's not worked.

所以我尝试使用filter(),但没有用。

example - here is two RDD:

示例 - 这是两个 RDD:

data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()

I want to get a (key, value) pair with the same key as data1based on the key that data2has.

我想获得一个(键,值)对,其键与data1基于具有的键相同data2

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))is the result I want.

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))是我想要的结果。

Is there a method to solve this problem using intersection()by key or filter()?

有没有办法使用intersection()key 或来解决这个问题filter()

采纳答案by mrsrinivas

This can be achieved in different ways

这可以通过不同的方式实现

1. broadcastvariable in filter()- needs scalability improvement

1. broadcastvariable in filter()- 需要可扩展性改进

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))

// broadcast data2 key list to use in filter method, which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())

val result = data1.filter(r => bcast.value.contains(r._1))


println(result.collect().toList)
//Output
List((a,1), (a,2), (b,2), (b,3))

2. cogroup(similar to group by key)

2. cogroup(类似于key group)

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))

val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
  (a, (CompactBuffer(1, 2), CompactBuffer(3))),
  (b, (CompactBuffer(2, 3), CompactBuffer(5))),
  (c, (CompactBuffer(1), CompactBuffer()))
) */

//Now filter keys which have two non empty CompactBuffer. You can do that with 
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. 
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty } 
/* List(
  (a, (CompactBuffer(1, 2), CompactBuffer(3))),
  (b, (CompactBuffer(2, 3), CompactBuffer(5)))
) */

//As we care about first data only, lets pick first compact buffer only 
// by doing v1.map(val1 => (k, val1))
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) }
//List((a, 1), (a, 2), (b, 2), (b, 3))

3. Using inner join

3. 使用内连接

val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct()
//List((b,2), (b,3), (a,2), (a,1)) 

Here data1.join(data2)holds pairs with common keys (inner join)

这里data1.join(data2)保存了具有公共键的对(内连接

//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1)))

回答by mtoto

For your problem, I think cogroup()is better suited. The intersection()method will consider both keys and values in your data, and will result in an empty rdd.

对于你的问题,我认为cogroup()更适合。该intersection()方法将考虑数据中的键和值,并会产生一个空的rdd.

The function cogroup()groups the values of both rdd's by key and gives us (key, vals1, vals2), where vals1and vals2contain the values of data1and data2respectively, for each key. Note that if a certain key is not shared in both datasets, one of vals1or vals2will be returned as an empty Seq, hence we'll first have to filter out these tuples to arrive at the intersectionof the two rdd's.

该功能cogroup()组两者的值rdd的由键,给了我们(key, vals1, vals2),其中vals1vals2包含的价值观data1data2分别为每个键。请注意,如果某个键未在两个数据集中共享,则vals1or 之一vals2将作为空返回Seq,因此我们首先必须过滤掉这些元组以到达两个的交集rdd

Next, we'll grab vals1- which contains the valuesfrom data1for the common keys- and convert it to format (key, Array). Lastly we use flatMapValues()to unpack the result into the format of (key, value).

接下来,我们将抓取vals1- 其中包含来自公共的值- 并将其转换为 format 。最后,我们使用将结果解包为. data1(key, Array)flatMapValues()(key, value)

val result = (data1.cogroup(data2)
  .filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
  .map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
  .flatMapValues(identity[Array[Int]]))

result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))