scala 用于 StructType / Row 的 Spark UDF

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42931796/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:08:45  来源:igfitidea点击:

Spark UDF for StructType / Row

scalaapache-sparkudf

提问by Danil Kirsanov

I have a "StructType" column in spark Dataframe that has an array and a string as sub-fields. I'd like to modify the array and return the new column of the same type. Can I process it with UDF? Or what are the alternatives?

我在 spark Dataframe 中有一个“StructType”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理吗?或者有哪些替代方案?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

It seems that I need a UDF of the type Row, something like

似乎我需要一个类型为 Row 的 UDF,类似于

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported

This makes sense, since Spark does not know the schema for the return type. Unfortunately, udf.register fails too:

这是有道理的,因为 Spark 不知道返回类型的模式。不幸的是, udf.register 也失败了:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)
     <console>:30: error: overloaded method value register with alternatives: ...

回答by Danil Kirsanov

turns out you can pass the result schema as a second UDF parameter:

事实证明,您可以将结果模式作为第二个 UDF 参数传递:

val u =  udf((x:Row) => x, sub_schema)

回答by Tawkir

You are on the right track. In this scenario UDF will make your life easy. As you have already encountered, UDF can not return types which spark does not know about. So basically you will need return something which spark can easily serialize. It may be a case classor you can return a tuple like (Seq[Int], String). So here is a modified version of your code:

你在正确的轨道上。在这种情况下,UDF 将使您的生活变得轻松。正如您已经遇到的,UDF 无法返回 spark 不知道的类型。所以基本上你需要返回一些火花可以轻松序列化的东西。它可能是一个case class或者你可以返回一个像(Seq[Int], String). 所以这是你的代码的修改版本:

def main(args: Array[String]): Unit = {
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._
  val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil)
  val schema = StructType(StructField("subtable", sub_schema, true) :: Nil)
  val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf")))
  val rd = spark.sparkContext.parallelize(data)
  val df = spark.createDataFrame(rd, schema)

  df.printSchema()
  df.show(false)

  val mapArray = (subRows: Row) => {
    // I prefer reading values from row by specifying column names, you may use index also
    val col1 = subRows.getAs[Seq[Int]]("col1")
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2
  }
  val mapUdf = udf(mapArray)

  val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable")))
  newDf.show(false)
  newDf.printSchema()
}

Please take a look at these links, these may give you more insight.

请查看这些链接,这些链接可能会给您更多的见解。

  1. Most comprehensive answer on working with complex schema: https://stackoverflow.com/a/33850490/4046067
  2. Spark supported data types: https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
  1. 关于使用复杂模式的最全面的答案:https: //stackoverflow.com/a/33850490/4046067
  2. Spark 支持的数据类型:https: //spark.apache.org/docs/latest/sql-programming-guide.html#data-types

回答by L. CWI

Yes you can do this with UDF. For simplicity, I took your example with case classes and I changed the array by adding 2 to every value :

是的,您可以使用 UDF 执行此操作。为简单起见,我以案例类为例,并通过向每个值添加 2 来更改数组:

case class Root(subtable: Subtable)
case class Subtable(col1: Seq[Int], col2: String)

val df = spark.createDataFrame(Seq(
  Root(Subtable(Seq(1, 2, 3), "toto")),
  Root(Subtable(Seq(10, 20, 30), "tata"))
))

val myUdf = udf((subtable: Row) =>
  Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1))
)
val result = df.withColumn("subtable_new", myUdf(df("subtable")))
result.printSchema()
result.show(false)

will print :

将打印:

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)
 |-- subtable_new: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

+-------------------------------+-------------------------------+
|subtable                       |subtable_new                   |
+-------------------------------+-------------------------------+
|[WrappedArray(1, 2, 3),toto]   |[WrappedArray(3, 4, 5),toto]   |
|[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]|
+-------------------------------+-------------------------------+