scala 如何处理 spark 中的错误 SPARK-5063

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29815878/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:05:13  来源:igfitidea点击:

how to deal with error SPARK-5063 in spark

scalaapache-spark

提问by G_cy

I get the error message SPARK-5063 in the line of println

我在 println 行中收到错误消息 SPARK-5063

val d.foreach{x=> for(i<-0 until x.length)
      println(m.lookup(x(i)))}    

d is RDD[Array[String]]m is RDD[(String, String)]. Is there any way to print as the way I want? or how can i convert d from RDD[Array[String]]to Array[String]?

d 是 RDD[Array[String]]m 是RDD[(String, String)]。有什么办法可以按照我想要的方式打印吗?或者我如何将 d 从 转换RDD[Array[String]]Array[String]

回答by maasg

SPARK-5063relates to better error messages when trying to nest RDD operations, which is not supported.

SPARK-5063与尝试嵌套 RDD 操作时更好的错误消息有关,这是不支持的。

It's a usability issue, not a functional one. The root cause is the nesting of RDD operations and the solution is to break that up.

这是一个可用性问题,而不是一个功能问题。根本原因是 RDD 操作的嵌套,解决方案是将其分解。

Here we are trying a join of dRDDand mRDD. If the size of mRDDis large, a rdd.joinwould be the recommended way otherwise, if mRDDis small, i.e. fits in memory of each executor, we could collect it, broadcast it and do a 'map-side' join.

这里我们尝试连接dRDDand mRDD。如果的大小mRDD很大,ardd.join将是推荐的方式,否则,如果mRDD很小,即适合每个执行程序的内存,我们可以收集它,广播它并进行“地图侧”连接。

JOIN

加入

A simple join would go like this:

一个简单的连接会是这样的:

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}

If we would like to use broadcast, we first need to collect the value of the resolution table locally in order to b/c that to all executors. NOTEthe RDD to be broadcasted MUSTfit in the memory of the driver as well as of each executor.

如果我们想使用广播,我们首先需要在本地收集解析表的值,以便将其 b/c 发送给所有执行程序。注意要广播的 RDD必须适合驱动程序和每个执行程序的内存。

Map-side JOIN with Broadcast variable

Map-side JOIN 与 Broadcast 变量

val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))} 

回答by Naveen Budda

This RDDlacks a SparkContext. It could happen in the following cases:

RDD缺少一个SparkContext. 它可能发生在以下情况:

RDDtransformations and actions are NOT invoked by the driver,

RDD驱动程序不会调用转换和操作,

but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x)is invalid because the values transformation and countaction cannot be performed inside of the rdd1.maptransformation

但在其他转换内部;例如,rdd1.map(x => rdd2.values.count() * x)无效,因为值转换和count操作不能在rdd1.map转换内部执行