scala 定义一个接受 Spark DataFrame 中的对象数组的 UDF?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39006349/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:33:58  来源:igfitidea点击:

Defining a UDF that accepts an Array of objects in a Spark DataFrame?

scalaapache-sparkdataframeapache-spark-sqluser-defined-functions

提问by ohruunuruus

When working with Spark's DataFrames, User Defined Functions (UDFs) are required for mapping data in columns. UDFs require that argument types are explicitly specified. In my case, I need to manipulate a column that is made up of arrays of objects, and I do not know what type to use. Here's an example:

使用 Spark 的 DataFrame 时,需要用户定义函数 (UDF) 来映射列中的数据。UDF 要求显式指定参数类型。就我而言,我需要操作由对象数组组成的列,但我不知道要使用什么类型。下面是一个例子:

import sqlContext.implicits._

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |{
  |  "topic" : "pets",
  |  "subjects" : [
  |    {"type" : "cat", "score" : 10},
  |    {"type" : "dog", "score" : 1}
  |  ]
  |}
  """)))

It's relatively straightforward to use the built-in org.apache.spark.sql.functionsto perform basic operations on the data in the columns

使用内置org.apache.spark.sql.functions对列中的数据进行基本操作相对简单

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+

and it's generally easy to write custom UDFs to perform arbitrary operations

并且通常很容易编写自定义 UDF 来执行任意操作

import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
|      PETS|             2|
+----------+--------------+

But what if I want to use a UDF to manipulate the array of objects in the "subjects" column? What type do I use for the argument in the UDF? For example, if I want to reimplement the size function, instead of using the one provided by spark:

但是如果我想使用 UDF 来操作“subjects”列中的对象数组呢?我对 UDF 中的参数使用什么类型?例如,如果我想重新实现 size 函数,而不是使用 spark 提供的函数:

val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show

Clearly Array[Something]does not work... what type should I use!? Should I ditch Array[]altogether? Poking around tells me scala.collection.mutable.WrappedArraymay have something to do with it, but still there's another type I need to provide.

显然Array[Something]不起作用......我应该使用什么类型!?我应该Array[]完全放弃吗?四处闲逛告诉我scala.collection.mutable.WrappedArray可能与它有关,但我仍然需要提供另一种类型。

采纳答案by zero323

What you're looking for is Seq[o.a.s.sql.Row]:

你要找的是Seq[o.a.s.sql.Row]

import org.apache.spark.sql.Row

val my_size = udf { subjects: Seq[Row] => subjects.size }

Explanation:

说明

  • Current representation of ArrayTypeis, as you already know, WrappedArrayso Arraywon't work and it is better to stay on the safe side.
  • According to the official specification, the local (external) type for StructTypeis Row. Unfortunately it means that access to the individual fields is not type safe.
  • 当前表现ArrayType是,因为你已经知道了,WrappedArray所以Array将无法正常工作,这是更好地留在安全方面。
  • 根据官方规范, 的本地(外部)类型StructTypeRow。不幸的是,这意味着对单个字段的访问不是类型安全的。

Notes:

注意事项

  • To create structin Spark < 2.3, function passed to udfhas to return Producttype (Tuple*or case class), not Row. That's because corresponding udfvariants depend on Scala reflection:

    Defines a Scala closure of narguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.

  • In Spark >= 2.3 it is possible to return Rowdirectly, as long as the schema is provided.

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunctionDefines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.

    See for example How to create a Spark UDF in Java / Kotlin which returns a complex type?.

  • struct在 Spark < 2.3 中创建,传递给的函数udf必须返回Product类型(Tuple*case class),而不是Row。那是因为相应的udf变体取决于 Scala 反射

    n 个参数的 Scala 闭包定义为用户定义函数 (UDF)。数据类型是根据 Scala 闭包的签名自动推断的。

  • 在 Spark >= 2.3 中Row只要提供了 schema 就可以直接返回。

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunction使用 Scala 闭包定义确定性用户定义函数 (UDF)。对于这个变体,调用者必须指定输出数据类型,并且没有自动输入类型强制。

    请参见示例如何在 Java/Kotlin 中创建返回复杂类型的 Spark UDF?.