scala 如何根据 Spark 中基于另一个 RDD 的函数过滤 RDD?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26035582/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:36:04  来源:igfitidea点击:

How to filter a RDD according to a function based another RDD in Spark?

scalamapapache-sparkrdd

提问by Chad

I am a beginner of Apache Spark. I want to filter out all groups whose sum of weight is larger than a constant value in a RDD. The "weight" map is also a RDD. Here is a small-size demo, the groups to be filtered is stored in "groups", the constant value is 12:

我是 Apache Spark 的初学者。我想过滤掉所有权重总和大于 RDD 中的常数值的组。“权重”映射也是一个 RDD。这里是一个小规模的演示,要过滤的组存储在“groups”中,常量值为12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

When the input data is very large, > 10GB for example, I always encounter a "java heap out of memory" error. I doubted if it's caused by "weights.toArray.toMap", because it convert an distributed RDD to an Java object in JVM. So I tried to filter with RDD directly:

当输入数据非常大时,例如>10GB,我总是遇到“java heap out of memory”的错误。我怀疑它是否是由“weights.toArray.toMap”引起的,因为它将分布式 RDD 转换为 JVM 中的 Java 对象。所以我尝试直接用RDD过滤:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

When I ran result.collectafter loading this script into spark shell, I got a "java.lang.NullPointerException" error. Someone told me when a RDD is manipulated in another RDD, there will be a nullpointer exception, and suggest me to put the weight into Redis.

当我result.collect将此脚本加载到 spark shell 后运行时,出现“java.lang.NullPointerException”错误。有人告诉我在另一个RDD中操作一个RDD时,会出现空指针异常,建议我把权重放到Redis中。

So how can I get the "result" without convert "weight" to Map, or put it into Redis? If there is a solution to filter a RDD based on another map-like RDD without the help of external datastore service? Thanks!

那么如何在不将“权重”转换为 Map 或将其放入 Redis 的情况下获得“结果”?如果没有外部数据存储服务的帮助,是否有解决方案可以根据另一个类似地图的 RDD 过滤 RDD?谢谢!

采纳答案by Shyamendra Solanki

The "java out of memory" error is coming because spark uses its spark.default.parallelismproperty while determining number of splits, which by default is number of cores available.

“java 内存不足”错误即将发生,因为 sparkspark.default.parallelism在确定拆分数时使用其属性,默认情况下是可用内核数。

// From CoarseGrainedSchedulerBackend.scala

override def defaultParallelism(): Int = {
   conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

When the input becomes large, and you have limited memory, you should increase number of splits.

当输入变大,而您的内存有限时,您应该增加拆分次数。

You can do something as follows:

您可以执行以下操作:

 val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
 val splitSize = 10000 // specify some number of elements that fit in memory.

 val numSplits = (input.size / splitSize) + 1 // has to be > 0.
 val groups = sc.parallelize(input, numSplits) // specify the # of splits.

 val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

 def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
 val result = groups.filter(isHeavy)

You may also consider increasing executor memory size using spark.executor.memory.

您还可以考虑使用spark.executor.memory.

回答by zhang zhan

Suppose your group is unique. Otherwise, first make it unique by distinct, etc. If group or weights is small, it should be easy. If both group and weights are huge, you can try this, which may be more scalable, but also looks complicated.

假设您的组是独一无二的。否则,首先通过distinct等使其唯一。如果组或权重很小,应该很容易。如果 group 和 weights 都很大,你可以试试这个,这可能更具扩展性,但看起来也很复杂。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)