scala SparkSQL:如何处理用户定义函数中的空值?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32357164/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
SparkSQL: How to deal with null values in user defined function?
提问by Martin Senne
Given Table 1 with one column "x" of type String. I want to create Table 2 with a column "y" that is an integer representation of the date strings given in "x".
给定表 1,其中包含一列 String 类型的“x”。我想用列“y”创建表 2,该列是“x”中给出的日期字符串的整数表示。
Essentialis to keep nullvalues in column "y".
本质是将null值保留在“y”列中。
Table 1 (Dataframe df1):
表 1(数据框 df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Table 2 (Dataframe df2):
表 2(数据框 df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
While the user-defined function (udf) to convert values from column "x" into those of column "y" is:
而将“x”列的值转换为“y”列的值的用户定义函数 (udf) 是:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
and works, dealing with null values is not possible.
并且有效,处理空值是不可能的。
Even though, I can do something like
尽管如此,我可以做类似的事情
val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else 1 )
I have found no way, to "produce" nullvalues via udfs (of course, as Ints can not be null).
我发现没有办法null通过 udfs “产生”值(当然,因为Ints 不能null)。
My current solution for creation of df2 (Table 2) is as follows:
我目前创建 df2(表 2)的解决方案如下:
// holds data of table 1
val df1 = ...
// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
.isNotNull)
.withColumn("y", extractDateAsInt(df1("x")))
.withColumnRenamed("x", "right_x")
// create df2 via a left join on df1 and dfNotNull having
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")
Questions:
问题:
- The current solution seems cumbersome (and probably not efficient wrt. performance). Is there a better way?
- @Spark-developers: Is there a type
NullableIntplanned / avaiable, such that the following udf is possible (see Code excerpt ) ?
- 当前的解决方案似乎很麻烦(并且可能效率不高。性能)。有没有更好的办法?
- @Spark-developers:是否有
NullableInt计划/可用的类型,以便可以使用以下 udf(请参阅代码摘录)?
Code excerpt
代码摘录
val extractDateAsNullableInt = udf[NullableInt, String] (
(d:String) =>
if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt
else null )
回答by zero323
This is where Optioncomes in handy:
这是Option派上用场的地方:
val extractDateAsOptionInt = udf((d: String) => d match {
case null => None
case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})
or to make it slightly more secure in general case:
或者在一般情况下使其稍微更安全:
import scala.util.Try
val extractDateAsOptionInt = udf((d: String) => Try(
d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)
All credit goes to Dmitriy Selivanovwho've pointed out this solution as a (missing?) edit here.
所有功劳都归功于Dmitriy Selivanov,他在此处指出此解决方案是(缺少?)编辑。
Alternative is to handle nulloutside the UDF:
替代方法是null在 UDF 之外处理:
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType
val extractDateAsInt = udf(
(d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)
df.withColumn("y",
when($"x".isNull, lit(null))
.otherwise(extractDateAsInt($"x"))
.cast(IntegerType)
)
回答by tristanbuckner
Scala actually has a nice factory function, Option(), that can make this even more concise:
Scala 实际上有一个很好的工厂函数 Option(),它可以使这更加简洁:
val extractDateAsOptionInt = udf((d: String) =>
Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))
Internally the Option object's apply method is just doing the null check for you:
在内部,Option 对象的 apply 方法只是为你做空检查:
def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
回答by Martin Senne
Supplementary code
补充代码
With the niceanswer of @zero323, I created the following code, to have user defined functions available that handle null values as described. Hope, it is helpful for others!
有了@zero323的好答案,我创建了以下代码,以便用户定义的函数可用于处理所描述的空值。希望,它对其他人有帮助!
/**
* Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
* handle `null` values.
*/
object NullableFunctions {
import org.apache.spark.sql.functions._
import scala.reflect.runtime.universe.{TypeTag}
import org.apache.spark.sql.UserDefinedFunction
/**
* Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if fnc input is null, None is returned. This will create a null value in the output Spark column.
* * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
udf[Option[RT],A1]( (i: A1) => i match {
case null => None
case s => Some(f(i))
})
}
/**
* Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
* * if on of the function input parameters is null, None is returned.
* This will create a null value in the output Spark column.
* * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
* as value in the output column.
* @param f function from A1 => RT
* @tparam RT return type
* @tparam A1 input parameter type
* @tparam A2 input parameter type
* @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
*/
def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
udf[Option[RT], A1, A2]( (i1: A1, i2: A2) => (i1, i2) match {
case (null, _) => None
case (_, null) => None
case (s1, s2) => Some((f(s1,s2)))
} )
}
}

