scala 引起:java.lang.NullPointerException at org.apache.spark.sql.Dataset
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/47358177/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset
提问by Markus
Below I provide my code. I iterate over the DataFrame prodRowsand for each product_PKI find some matching sub-list of product_PKs from prodRows.
下面我提供我的代码。我遍历 DataFrameprodRows并为每个product_PK我找到一些匹配的 product_PKs 子列表prodRows。
numRecProducts = 10
var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
prodRows.foreach{ row : Row =>
val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
val gender = row.get(row.fieldIndex("gender_PK")).toString
val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
var productList: Array[(Long, Int)] = Array()
if (!selection.rdd.isEmpty()) {
productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
}
listOfProducts = listOfProducts + (product_PK -> productList)
}
But when I execute it, it gives me the following error. It looks like selectionis empty in some iterations. However, I do not understand how can I handle this error:
但是当我执行它时,它给了我以下错误。selection在某些迭代中看起来是空的。但是,我不明白如何处理此错误:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply$mcV$sp(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach.apply(Dataset.scala:2325)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
at org.test.ComputeNumSim$$anonfun$run.apply(ComputeNumSim.scala:74)
at org.test.ComputeNumSim$$anonfun$run.apply(ComputeNumSim.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$$anonfun$apply.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
What does it mean and how can I handle it?
这是什么意思,我该如何处理?
回答by Tzach Zohar
You cannot access any of Spark's "driver-side" abstractions (RDDs, DataFrames, Datasets, SparkSession...) from within a function passed on to one of Spark's DataFrame/RDD transformations. You also cannot update driver-side mutable objects from within these functions.
您无法从传递给 Spark 的 DataFrame/RDD 转换之一的函数中访问 Spark 的任何“驱动程序端”抽象(RDD、DataFrame、Dataset、SparkSession...)。您也无法从这些函数中更新驱动程序端可变对象。
In your case - you're trying to use prodRowsand selection(both are DataFrames) within a function passed to DataFrame.foreach. You're also trying to updatelistOfProducts(a local driver-side variable) from within that same function.
你的情况-你正在尝试使用prodRows和selection(均为DataFrames)传递给函数内DataFrame.foreach。您还尝试从同一个函数中更新listOfProducts(本地驱动程序端变量)。
Why?
为什么?
- DataFrames, RDDs, and SparkSession only exist on your Driver application. They serve as a "handle" to access data distributed over the cluster of worker machines.
- Functions passed to RDD/DataFrame transformations get serializedand sent to that cluster, to be executed on the data partitions on each of the worker machines. When the serialized DataFrames/RDDs get deserialized on those machines - they are useless, they can't still represent the data on the cluster as they are just hollow copies of the ones created on the driver application, which actually maintains a connectionto the cluster machines
- For the same reason, attempting to update driver-side variables will fail: the variables (starting out as empty, in most cases) will be serialized, deserialized on each of the workers, get updated locally on the workers, and stay there... the original driver-side variable will remain unchanged
- DataFrames、RDDs 和 SparkSession 只存在于你的 Driver 应用程序中。它们充当访问分布在工作机器集群上的数据的“句柄”。
- 传递给 RDD/DataFrame 转换的函数被序列化并发送到该集群,以在每个工作机器上的数据分区上执行。当序列化的 DataFrames/RDD 在这些机器上被反序列化时——它们是无用的,它们仍然不能代表集群上的数据,因为它们只是驱动程序应用程序上创建的数据的空心副本,它实际上维护着与集群的连接机器
- 出于同样的原因,尝试更新驱动程序端变量将失败:变量(在大多数情况下从空开始)将被序列化,在每个工作人员上反序列化,在工作人员上本地更新,并留在那里。 . 原司机端变量将保持不变
How can you solve this?When working with Spark, especially with DataFrames, you should try to avoid "iteration" over the data, and use DataFrame's declarative operations instead. In most cases, when you want to reference data of anotherDataFrame for each record in your DataFrame, you'd want to use jointo create a new DataFrame with records combining data from the two DataFrames.
你怎么能解决这个问题?使用 Spark 时,尤其是使用 DataFrame 时,您应该尽量避免对数据进行“迭代”,而应使用 DataFrame 的声明性操作。在大多数情况下,当您想要为DataFrame 中的每条记录引用另一个DataFrame 的数据时,您可能希望使用join组合来自两个 DataFrame 的数据的记录来创建一个新的 DataFrame。
In this specific case, here's a roughlyequivalent solution that does what you're trying to do, if I managed to conclude it correctly. Try to use this and read the DataFrame documentation to figure out the details:
在这种特定情况下,如果我设法正确得出结论,这里有一个大致等效的解决方案,可以完成您要执行的操作。尝试使用它并阅读 DataFrame 文档以找出详细信息:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val numRecProducts = 10
val result = prodRows.as("left")
// self-join by gender:
.join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
// limit to 10 results per record:
.withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
.filter($"rn" <= numRecProducts).drop($"rn")
// group and collect_list to create products column:
.groupBy($"left.product_PK" as "product_PK")
.agg(collect_list(struct($"right.product_PK", lit(1))) as "products")
回答by Raphael Roth
The problem is that you try to access prodRowsfrom within prodRows.foreach. You cannot use a dataframe within a transformation, dataframes only exist on the driver.
问题是您尝试prodRows从prodRows.foreach. 您不能在转换中使用数据帧,数据帧仅存在于驱动程序中。

