Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23793117/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:16:05  来源:igfitidea点击:

NullPointerException in Scala Spark, appears to be caused be collection type?

scalaapache-spark

提问by blue-sky

sessionIdListis of type :

sessionIdList是类型:

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

When I try to run below code :

当我尝试运行以下代码时:

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))

val kDistanceNeighbourhood = sessionIdList.map(s => {
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

I receive exception :

我收到异常:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:36)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

However if I use :

但是,如果我使用:

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => {    
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

Then no exception is displayed

然后不显示异常

The difference between the two code snippets is that in first snippet sessionIdList is of type :

两个代码片段之间的区别在于,第一个片段 sessionIdList 的类型为:

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

and in second snippet "l" is of type

在第二个片段中,“l”的类型是

scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12

Why is this error occuring ?

为什么会出现这个错误?

Do I need to convert sessionIdList to ParallelCollectionRDD in order to fix this ?

我是否需要将 sessionIdList 转换为 ParallelCollectionRDD 才能解决这个问题?

回答by Josh Rosen

Spark doesn't support nesting of RDDs (see https://stackoverflow.com/a/14130534/590203for another occurrence of the same problem), so you can't perform transformations or actions on RDDs inside of other RDD operations.

Spark 不支持 RDD 的嵌套(请参阅https://stackoverflow.com/a/14130534/590203以了解另一个相同问题的发生情况),因此您无法在其他 RDD 操作中对 RDD 执行转换或操作。

In the first case, you're seeing a NullPointerException thrown by the worker when it tries to access a SparkContext object that's only present on the driver and not the workers.

在第一种情况下,当工作人员尝试访问仅存在于驱动程序而非工作人员上的 SparkContext 对象时,您会看到工作人员抛出 NullPointerException。

In the second case, my hunch is the job was run locally on the driver and worked purely by accident.

在第二种情况下,我的预感是该工作是在驱动程序本地运行的,并且纯粹是偶然的。

回答by chelBert

Its a reasonable question and I have heard it asked it enough times that. I'm going to try to take a stab at explaining why this is true, because it might help.

这是一个合理的问题,我听说它问了很多次了。我将尝试解释为什么这是真的,因为它可能会有所帮助。

Nested RDDs will alwaysthrow an exception in production. Nested function calls as I think you are describing them here, if it means calling an RDD operation inside an RDD operation, will cause also cause failures since it is actually the same thing. (RDDs are immutable, so performing an RDD operation such as a "map" is equivalent to creating a new RDD.) The in ability to create nested RDDs is a necessary consequence of the way an RDD is defined and the way the Spark Application is set up.

嵌套的 RDD在生产中总是会抛出异常。我认为您在这里描述的嵌套函数调用,如果这意味着在 RDD 操作中调用 RDD 操作,也会导致失败,因为它实际上是同一件事。(RDD 是不可变的,因此执行诸如“map”之类的 RDD 操作等效于创建一个新的 RDD。)创建嵌套 RDD 的能力是 RDD 定义方式和 Spark 应用程序方式的必然结果设置。

An RDD is a distributed collection of objects (called partitions) that live on the Spark Executors. Spark executors cannot communicate with each other, only with the Spark driver. The RDD operations are all computed in pieces on these partitions.Because the RDD's executor environment isn't recursive (i.e. you can configure a Spark driver to be on a spark executor with sub executors) neither can an RDD.

RDD 是驻留在 Spark Executor 上的分布式对象集合(称为分区)。Spark executors 之间不能通信,只能与 Spark driver 通信。RDD 操作都是在这些分区上分块计算的。因为 RDD 的执行器环境不是递归的(即您可以将 Spark 驱动程序配置为带有子执行器的 Spark 执行器),RDD 也不能。

In your program, you have created a distributed collection of partitions of integers. You are then performing a mapping operation. When the Spark driver sees a mapping operation, it sends the instructions to do the mapping to the executors, who perform the transformation on each partition in parallel. But your mapping cannot be done, because on each partition you are trying to call the "whole RDD" to perform another distributed operation. This can't not be done, because each partition does not have access to the information on the other partitions, if it did, the computation couldn't run in parallel.

在您的程序中,您创建了一个整数分区的分布式集合。然后您正在执行映射操作。当 Spark 驱动程序看到映射操作时,它会向执行程序发送执行映射的指令,执行程序并行地对每个分区执行转换。但是您的映射无法完成,因为在每个分区上您都试图调用“整个 RDD”来执行另一个分布式操作。这是不可能的,因为每个分区都无法访问其他分区上的信息,如果访问了,则计算无法并行运行。

What you can do instead, because the data you need in the map is probably small (since you are doing a filter, and the filter does not require any information about sessionIdList) is to first filter the session ID list. Then collect that list to the driver. Then broadcast it to the executors, where you can use it in the map. If the sessionID list is too large, you will probably need to do a join.

你可以做的是,因为你在map中需要的数据可能很小(因为你在做一个过滤器,而且过滤器不需要任何关于sessionIdList的信息)是先过滤会话ID列表。然后将该列表收集给驱动程序。然后广播给执行者,在那里你可以在地图中使用它。如果 sessionID 列表太大,您可能需要进行连接。