scala 带有可变参数的 Spark UDF
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33151866/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark UDF with varargs
提问by devopslife
Is it an only option to list all the arguments up to 22 as shown in documentation?
如文档中所示,列出最多 22 个参数是唯一的选择吗?
https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration
https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration
Anyone figured out how to do something similar to this?
有没有人想出如何做类似的事情?
sc.udf.register("func", (s: String*) => s......
(writing custom concat function that skips nulls, had to 2 arguments at the time)
(编写跳过空值的自定义 concat 函数,当时必须有 2 个参数)
Thanks
谢谢
回答by zero323
UDFs don't support varargs* but you can pass an arbitrary number of columns wrapped using an arrayfunction:
UDF 不支持 varargs*,但您可以传递使用array函数包装的任意数量的列:
import org.apache.spark.sql.functions.{udf, array, lit}
val myConcatFunc = (xs: Seq[Any], sep: String) =>
xs.filter(_ != null).mkString(sep)
val myConcat = udf(myConcatFunc)
An example usage:
示例用法:
val df = sc.parallelize(Seq(
(null, "a", "b", "c"), ("d", null, null, "e")
)).toDF("x1", "x2", "x3", "x4")
val cols = array($"x1", $"x2", $"x3", $"x4")
val sep = lit("-")
df.select(myConcat(cols, sep).alias("concatenated")).show
// +------------+
// |concatenated|
// +------------+
// | a-b-c|
// | d-e|
// +------------+
With raw SQL:
使用原始 SQL:
df.registerTempTable("df")
sqlContext.udf.register("myConcat", myConcatFunc)
sqlContext.sql(
"SELECT myConcat(array(x1, x2, x4), '.') AS concatenated FROM df"
).show
// +------------+
// |concatenated|
// +------------+
// | a.c|
// | d.e|
// +------------+
A slightly more complicated approach is not use UDF at all and compose SQL expressions with something roughly like this:
一种稍微复杂的方法是根本不使用 UDF,而是使用大致如下所示的内容组合 SQL 表达式:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
def myConcatExpr(sep: String, cols: Column*) = regexp_replace(concat(
cols.foldLeft(lit(""))(
(acc, c) => when(c.isNotNull, concat(acc, c, lit(sep))).otherwise(acc)
)
), s"($sep)?$$", "")
df.select(
myConcatExpr("-", $"x1", $"x2", $"x3", $"x4").alias("concatenated")
).show
// +------------+
// |concatenated|
// +------------+
// | a-b-c|
// | d-e|
// +------------+
but I doubt it is worth the effort unless you work with PySpark.
但我怀疑这是否值得,除非您使用 PySpark。
* If you pass a function using varargs it will be stripped from all the syntactic sugar and resulting UDF will expect an ArrayType. For example:
* 如果您使用可变参数传递一个函数,它将从所有语法糖中剥离,并且生成的 UDF 将期望一个ArrayType. 例如:
def f(s: String*) = s.mkString
udf(f _)
will be of type:
将是类型:
UserDefinedFunction(<function1>,StringType,List(ArrayType(StringType,true)))

