scala 如何将额外的参数传递给 Spark SQL 中的 UDF?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35546576/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I pass extra parameters to UDFs in Spark SQL?
提问by DarkZero
I want to parse the date columns in a DataFrame, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").
我想解析 a 中的日期列DataFrame,对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011 /01 如果分辨率设置为“月”)。
I wrote the following code:
我写了以下代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
However it doesn't work. It seems that I can only pass Columns to UDFs. And I wonder if it will be very slow if I convert the DataFrameto RDDand apply the function on each row.
但是它不起作用。似乎我只能将Columns传递给 UDF。而且我想知道如果我将其转换DataFrame为RDD并在每一行上应用该函数是否会很慢。
Does anyone know the correct solution? Thank you!
有谁知道正确的解决方案?谢谢!
回答by zero323
Just use a little bit of currying:
只需使用一点点咖喱:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
and use it as follows:
并按如下方式使用它:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
On a side note you should take a look at sql.functions.truncand sql.functions.date_format. These should at least part of the job without using UDFs at all.
在旁注中,您应该查看sql.functions.trunc和sql.functions.date_format。这些应该至少是工作的一部分,根本不使用 UDF。
Note:
注意:
In Spark 2.2 or later you can use typedLitfunction:
在 Spark 2.2 或更高版本中,您可以使用typedLit函数:
import org.apache.spark.sql.functions.typedLit
which support a wider range of literals like Seqor Map.
它支持更广泛的文字,如Seq或Map。
回答by Michael Armbrust
You can create a literal Columnto pass to a udf using the lit(...)function defined in org.apache.spark.sql.functions
您可以Column使用lit(...)定义的函数创建一个文字以传递给 udforg.apache.spark.sql.functions
For example:
例如:
val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))

