scala 定义一个接受 Spark DataFrame 中的对象数组的 UDF?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39006349/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Defining a UDF that accepts an Array of objects in a Spark DataFrame?
提问by ohruunuruus
When working with Spark's DataFrames, User Defined Functions (UDFs) are required for mapping data in columns. UDFs require that argument types are explicitly specified. In my case, I need to manipulate a column that is made up of arrays of objects, and I do not know what type to use. Here's an example:
使用 Spark 的 DataFrame 时,需要用户定义函数 (UDF) 来映射列中的数据。UDF 要求显式指定参数类型。就我而言,我需要操作由对象数组组成的列,但我不知道要使用什么类型。下面是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
It's relatively straightforward to use the built-in org.apache.spark.sql.functionsto perform basic operations on the data in the columns
使用内置org.apache.spark.sql.functions对列中的数据进行基本操作相对简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
and it's generally easy to write custom UDFs to perform arbitrary operations
并且通常很容易编写自定义 UDF 来执行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
But what if I want to use a UDF to manipulate the array of objects in the "subjects" column? What type do I use for the argument in the UDF? For example, if I want to reimplement the size function, instead of using the one provided by spark:
但是如果我想使用 UDF 来操作“subjects”列中的对象数组呢?我对 UDF 中的参数使用什么类型?例如,如果我想重新实现 size 函数,而不是使用 spark 提供的函数:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
Clearly Array[Something]does not work... what type should I use!? Should I ditch Array[]altogether? Poking around tells me scala.collection.mutable.WrappedArraymay have something to do with it, but still there's another type I need to provide.
显然Array[Something]不起作用......我应该使用什么类型!?我应该Array[]完全放弃吗?四处闲逛告诉我scala.collection.mutable.WrappedArray可能与它有关,但我仍然需要提供另一种类型。
采纳答案by zero323
What you're looking for is Seq[o.a.s.sql.Row]:
你要找的是Seq[o.a.s.sql.Row]:
import org.apache.spark.sql.Row
val my_size = udf { subjects: Seq[Row] => subjects.size }
Explanation:
说明:
- Current representation of
ArrayTypeis, as you already know,WrappedArraysoArraywon't work and it is better to stay on the safe side. - According to the official specification, the local (external) type for
StructTypeisRow. Unfortunately it means that access to the individual fields is not type safe.
- 当前表现
ArrayType是,因为你已经知道了,WrappedArray所以Array将无法正常工作,这是更好地留在安全方面。 - 根据官方规范, 的本地(外部)类型
StructType是Row。不幸的是,这意味着对单个字段的访问不是类型安全的。
Notes:
注意事项:
To create
structin Spark < 2.3, function passed toudfhas to returnProducttype (Tuple*orcase class), notRow. That's because correspondingudfvariants depend on Scala reflection:Defines a Scala closure of narguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.
In Spark >= 2.3 it is possible to return
Rowdirectly, as long as the schema is provided.def udf(f: AnyRef, dataType: DataType): UserDefinedFunctionDefines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.See for example How to create a Spark UDF in Java / Kotlin which returns a complex type?.
要
struct在 Spark < 2.3 中创建,传递给的函数udf必须返回Product类型(Tuple*或case class),而不是Row。那是因为相应的udf变体取决于 Scala 反射:将n 个参数的 Scala 闭包定义为用户定义函数 (UDF)。数据类型是根据 Scala 闭包的签名自动推断的。
在 Spark >= 2.3 中
Row,只要提供了 schema 就可以直接返回。def udf(f: AnyRef, dataType: DataType): UserDefinedFunction使用 Scala 闭包定义确定性用户定义函数 (UDF)。对于这个变体,调用者必须指定输出数据类型,并且没有自动输入类型强制。

