scala org.apache.spark.SparkException:无法执行用户定义的函数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/45739168/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
org.apache.spark.SparkException: Failed to execute user defined function
提问by Inna
I new to scalaand I am trying to execute the following code:
我是Scala 的新手,我正在尝试执行以下代码:
val SetID = udf{(c:String, d: String) =>
if( c.UpperCase.contains("EXKLUS") == true)
{d}
else {""}
}
val ParquetWithID = STG1
.withColumn("ID", SetID( col("line_item"), col("line_item_ID")))
both columns (line_itemand line_item_id) are defined as Stringsin the STG1Schema.
两列 (line_item和line_item_id) 都Strings在STG1架构中定义。
I get the following error when I try to run the code:
当我尝试运行代码时出现以下错误:
`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$$anonfun: (string, string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
and
和
Caused by: java.lang.NullPointerException
at MyTests$$anonfun$$anonfun.apply(MyTests.scala:356)
at MyTests$$anonfun$$anonfun.apply(MyTests.scala:355)
... 16 more
I also tried c.UpperCase().contains("EXKLUS")but i got the same error.
However, if I simply run a "if equals" statement everything works fine. So I guess the problem is in using UpperCase().contains(" ")function in my udfbut I do not understand where the problem comes from. Any help would be appriciated!
我也试过,c.UpperCase().contains("EXKLUS")但我得到了同样的错误。但是,如果我只是运行一个“ if equals”语句,一切正常。所以我想问题出UpperCase().contains(" ")在我的函数中,udf但我不明白问题出在哪里。任何帮助都会受到赞赏!
采纳答案by Ramesh Maharjan
if the schemacontains as
如果schema包含为
|-- line_item: string (nullable = true)
|-- line_item_ID: string (nullable = true)
then checking for nullin your if statement should solve the issue as (note that there is toUpperCasemethod for strings)
然后检查null你的 if 语句应该解决这个问题(注意toUpperCase字符串有方法)
val SetID = udf{(c:String, d: String) =>
if(c != null && c.toUpperCase.contains("EXKLUS") == true)
{d}
else {""}
}
val ParquetWithID = STG1
.withColumn("ID", SetID( col("line_item"), col("line_item_ID")))
I hope the answer is helpful
我希望答案有帮助

