scala 从 Spark DataFrame 中的单个列派生多个列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32196207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Derive multiple columns from a single column in a Spark DataFrame
提问by sshroff
I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.
我有一个 DF,它有一个巨大的可解析元数据作为 Dataframe 中的单个字符串列,我们称之为 DFA,使用 ColmnA。
I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.
我想通过函数 ClassXYZ = Func1(ColmnA) 将该列 ColmnA 分成多列。此函数返回一个 ClassXYZ 类,具有多个变量,现在必须将这些变量中的每一个映射到新的 Column,例如 ColmnA1、ColmnA2 等。
How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.
我将如何通过仅调用此 Func1 一次来将这些附加列从 1 个 Dataframe 转换为另一个,而不必重复它来创建所有列。
Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.
如果我每次都调用这个巨大的函数来添加一个新列,它很容易解决,但这是我希望避免的。
Kindly please advise with a working or pseudo code.
请提供工作或伪代码建议。
Thanks
谢谢
Sanjay
桑杰
回答by zero323
Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:
一般来说,你想要的不是直接可能的。UDF 一次只能返回一列。有两种不同的方法可以克服此限制:
Return a column of complex type. The most general solution is a
StructTypebut you can considerArrayTypeorMapTypeas well.import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: String) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: string (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false)This can be easily flattened later but usually there is no need for that.
Switch to RDD, reshape and rebuild DF:
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+
返回复杂类型的列。最通用的解决方案是 a
StructType但您可以考虑ArrayTypeorMapTypeas well。import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: String) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: string (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false)这稍后可以很容易地展平,但通常没有必要这样做。
切换到RDD,重塑并重建DF:
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+
回答by EdwinGuo
Assume that after your function there will be a sequence of elements, giving an example as below:
假设在你的函数之后会有一个元素序列,举个例子如下:
val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
| infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
| jill,1989,London| 27|
+------------------+---+
now what you can do with this infoComb is that you can start split the string and get more columns with:
现在你可以用这个 infoComb 做的是你可以开始拆分字符串并获得更多列:
df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn| city|age|
+-----+----------+-------+---+
|Mike| 1986|Toronto| 30|
|Andre| 1980| Ottawa| 36|
| jill| 1989| London| 27|
+-----+----------+-------+---+
Hope this helps.
希望这可以帮助。
回答by Niemand
If your resulting columns will be of the same length as the original one, you can create brand new columns with withColumnfunction and by applying an udf. After this you can drop your original column, eg:
如果生成的列的长度与原始列的长度相同,则可以使用withColumn函数并应用udf来创建全新的列。在此之后,您可以删除原始列,例如:
val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))
where myFun is an udf defined like this:
其中 myFun 是一个定义如下的 udf:
def myFun= udf(
(originalColumnContent : String) => {
// do something with your original column content and return a new one
}
)
回答by Pekka
I opted to create a function to flatten one column and then just call it simultaneously with the udf.
我选择创建一个函数来展平一列,然后与 udf 同时调用它。
First define this:
首先定义这个:
implicit class DfOperations(df: DataFrame) {
def flattenColumn(col: String) = {
def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
if (cols.isEmpty) df
else addColumns(
df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
cols.tail
)
}
val field = df.select(col).schema.fields(0)
val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
addColumns(df, newCols).drop(col)
}
def withColumnMany(colName: String, col: Column) = {
df.withColumn(colName, col).flattenColumn(colName)
}
}
Then usage is very simple:
那么用法很简单:
case class MyClass(a: Int, b: Int)
val df = sc.parallelize(Seq(
(0),
(1)
)).toDF("x")
val f = udf((x: Int) => MyClass(x*2,x*3))
df.withColumnMany("test", f($"x")).show()
// +---+------+------+
// | x|test_a|test_b|
// +---+------+------+
// | 0| 0| 0|
// | 1| 2| 3|
// +---+------+------+
回答by Abhishek Kgsk
This can be easily achieved by using pivot function
这可以通过使用枢轴函数轻松实现
df4.groupBy("year").pivot("course").sum("earnings").collect()

