scala 从 Spark DataFrame 中的单个列派生多个列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32196207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:30:30  来源:igfitidea点击:

Derive multiple columns from a single column in a Spark DataFrame

scalaapache-sparkdataframeapache-spark-sqluser-defined-functions

提问by sshroff

I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

我有一个 DF,它有一个巨大的可解析元数据作为 Dataframe 中的单个字符串列,我们称之为 DFA,使用 ColmnA。

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

我想通过函数 ClassXYZ = Func1(ColmnA) 将该列 ColmnA 分成多列。此函数返回一个 ClassXYZ 类,具有多个变量,现在必须将这些变量中的每一个映射到新的 Column,例如 ColmnA1、ColmnA2 等。

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

我将如何通过仅调用此 Func1 一次来将这些附加列从 1 个 Dataframe 转换为另一个,而不必重复它来创建所有列。

Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.

如果我每次都调用这个巨大的函数来添加一个新列,它很容易解决,但这是我希望避免的。

Kindly please advise with a working or pseudo code.

请提供工作或伪代码建议。

Thanks

谢谢

Sanjay

桑杰

回答by zero323

Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:

一般来说,你想要的不是直接可能的。UDF 一次只能返回一列。有两种不同的方法可以克服此限制:

  1. Return a column of complex type. The most general solution is a StructTypebut you can consider ArrayTypeor MapTypeas well.

    import org.apache.spark.sql.functions.udf
    
    val df = Seq(
      (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
    
    case class Foobar(foo: Double, bar: Double)
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
      Foobar(x * y, z.head.toInt * y))
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // |  x|   y|  z|      foobar|
    // +---+----+---+------------+
    // |  1| 3.0|  a| [3.0,291.0]|
    // |  2|-1.0|  b|[-2.0,-98.0]|
    // |  3| 0.0|  c|   [0.0,0.0]|
    // +---+----+---+------------+
    
    df1.printSchema
    // root
    //  |-- x: long (nullable = false)
    //  |-- y: double (nullable = false)
    //  |-- z: string (nullable = true)
    //  |-- foobar: struct (nullable = true)
    //  |    |-- foo: double (nullable = false)
    //  |    |-- bar: double (nullable = false)
    

    This can be easily flattened later but usually there is no need for that.

  2. Switch to RDD, reshape and rebuild DF:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    
  1. 返回复杂类型的列。最通用的解决方案是 aStructType但您可以考虑ArrayTypeor MapTypeas well。

    import org.apache.spark.sql.functions.udf
    
    val df = Seq(
      (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
    
    case class Foobar(foo: Double, bar: Double)
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
      Foobar(x * y, z.head.toInt * y))
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // |  x|   y|  z|      foobar|
    // +---+----+---+------------+
    // |  1| 3.0|  a| [3.0,291.0]|
    // |  2|-1.0|  b|[-2.0,-98.0]|
    // |  3| 0.0|  c|   [0.0,0.0]|
    // +---+----+---+------------+
    
    df1.printSchema
    // root
    //  |-- x: long (nullable = false)
    //  |-- y: double (nullable = false)
    //  |-- z: string (nullable = true)
    //  |-- foobar: struct (nullable = true)
    //  |    |-- foo: double (nullable = false)
    //  |    |-- bar: double (nullable = false)
    

    这稍后可以很容易地展平,但通常没有必要这样做。

  2. 切换到RDD,重塑并重建DF:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    

回答by EdwinGuo

Assume that after your function there will be a sequence of elements, giving an example as below:

假设在你的函数之后会有一个元素序列,举个例子如下:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
|          infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
|  jill,1989,London| 27|
+------------------+---+

now what you can do with this infoComb is that you can start split the string and get more columns with:

现在你可以用这个 infoComb 做的是你可以开始拆分字符串并获得更多列:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn|   city|age|
+-----+----------+-------+---+
|Mike|      1986|Toronto| 30|
|Andre|      1980| Ottawa| 36|
| jill|      1989| London| 27|
+-----+----------+-------+---+

Hope this helps.

希望这可以帮助。

回答by Niemand

If your resulting columns will be of the same length as the original one, you can create brand new columns with withColumnfunction and by applying an udf. After this you can drop your original column, eg:

如果生成的列的长度与原始列的长度相同,则可以使用withColumn函数并应用udf来创建全新的列。在此之后,您可以删除原始列,例如:

 val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))

where myFun is an udf defined like this:

其中 myFun 是一个定义如下的 udf:

   def myFun= udf(
    (originalColumnContent : String) =>  {
      // do something with your original column content and return a new one
    }
  )

回答by Pekka

I opted to create a function to flatten one column and then just call it simultaneously with the udf.

我选择创建一个函数来展平一列,然后与 udf 同时调用它。

First define this:

首先定义这个:

implicit class DfOperations(df: DataFrame) {

  def flattenColumn(col: String) = {
    def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
      if (cols.isEmpty) df
      else addColumns(
        df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
        cols.tail
      )
    }

    val field = df.select(col).schema.fields(0)
    val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)

    addColumns(df, newCols).drop(col)
  }

  def withColumnMany(colName: String, col: Column) = {
    df.withColumn(colName, col).flattenColumn(colName)
  }

}

Then usage is very simple:

那么用法很简单:

case class MyClass(a: Int, b: Int)

val df = sc.parallelize(Seq(
  (0),
  (1)
)).toDF("x")

val f = udf((x: Int) => MyClass(x*2,x*3))

df.withColumnMany("test", f($"x")).show()

//  +---+------+------+
//  |  x|test_a|test_b|
//  +---+------+------+
//  |  0|     0|     0|
//  |  1|     2|     3|
//  +---+------+------+

回答by Abhishek Kgsk

This can be easily achieved by using pivot function

这可以通过使用枢轴函数轻松实现

df4.groupBy("year").pivot("course").sum("earnings").collect()