Scala 和 Spark UDF 函数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38633216/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Scala and Spark UDF function
提问by fanbondi
I made a simple UDF to convert or extract some values from a time field in a temptabl in spark. I register the function but when I call the function using sql it throws a NullPointerException. Below is my function and process of executing it. I am using Zeppelin. Strangly this was working yesterday but it stopped working this morning.
我制作了一个简单的 UDF 来转换或从 spark 中的 temptabl 中的时间字段中提取一些值。我注册了该函数,但是当我使用 sql 调用该函数时,它会引发 NullPointerException。下面是我的函数和执行过程。我正在使用齐柏林飞艇。奇怪的是,这是昨天工作,但今天早上停止工作。
Function
功能
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
Register the Function
注册函数
sqlContext.udf.register("convert",convert _)
Test the function without SQL -- This works
在没有 SQL 的情况下测试函数——这有效
convert(12:12:12) -> returns 12:12
Test the function with SQL in Zeppelin this FAILS.
在 Zeppelin 中使用 SQL 测试该函数失败。
%sql
select convert(time) from temptable limit 10
Structure of temptable
temptable的结构
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
Part of the stacktrace that I am getting.
我得到的堆栈跟踪的一部分。
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
回答by Rockie Yang
Use udf instead of define a function directly
使用udf而不是直接定义一个函数
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
A udf's input parameter is Column(or Columns). And the return type is Column.
udf 的输入参数是列(或列)。返回类型是列。
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}
回答by kfkhalili
You have to define your function as a UDF.
您必须将函数定义为 UDF。
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val convertUDF: UserDefinedFunction = udf((time:String) => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
})
Next you would apply your UDF on your DataFrame.
接下来,您将在 DataFrame 上应用您的 UDF。
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
Now, as to your actual problem, one reason you are receiving this error could be because your DataFrame contains rows which are nulls. If you filter them out before you apply the UDF, you should be able to continue no problem.
现在,至于您的实际问题,您收到此错误的一个原因可能是您的 DataFrame 包含空行。如果您在应用 UDF 之前将它们过滤掉,您应该能够继续没有问题。
dataFrame.filter(col("time").isNotNull)
I'm curious what else causes a NullPointerException when running a UDF other than it encountering a null, if you found a reason different than my suggestion, I'd be glad to know.
我很好奇在运行 UDF 时除了遇到 null 之外还有什么原因导致 NullPointerException,如果您发现与我的建议不同的原因,我很高兴知道。

