scala 在 Spark 数据框中分解嵌套结构
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39275816/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Exploding nested Struct in Spark dataframe
提问by Feynman27
I'm working through the Databricks example. The schema for the dataframe looks like:
我正在研究 Databricks示例。数据框的架构如下所示:
> parquetDF.printSchema
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: integer (nullable = true)
In the example, they show how to explode the employees column into 4 additional columns:
在示例中,他们展示了如何将员工列分解为 4 个附加列:
val explodeDF = parquetDF.explode($"employees") {
case Row(employee: Seq[Row]) => employee.map{ employee =>
val firstName = employee(0).asInstanceOf[String]
val lastName = employee(1).asInstanceOf[String]
val email = employee(2).asInstanceOf[String]
val salary = employee(3).asInstanceOf[Int]
Employee(firstName, lastName, email, salary)
}
}.cache()
display(explodeDF)
How would I do something similar with the department column (i.e. add two additional columns to the dataframe called "id" and "name")? The methods aren't exactly the same, and I can only figure out how to create a brand new data frame using:
我将如何对部门列做类似的事情(即向名为“id”和“name”的数据框添加两个额外的列)?方法并不完全相同,我只能弄清楚如何使用以下方法创建全新的数据框:
val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)
If I try:
如果我尝试:
val explodeDF = parquetDF.explode($"department") {
case Row(dept: Seq[String]) => dept.map{dept =>
val id = dept(0)
val name = dept(1)
}
}.cache()
display(explodeDF)
I get the warning and error:
我收到警告和错误:
<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
case Row(dept: Seq[String]) => dept.map{dept =>
^
<console>:37: error: inferred type arguments [Unit] do not conform to method explode's type parameter bounds [A <: Product]
val explodeDF = parquetDF.explode($"department") {
^
采纳答案by gsamaras
You could use something like that:
你可以使用类似的东西:
var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))
which you helped me into and these questions:
你帮助我解决了这些问题:
回答by DHARIN PAREKH
In my opinion the most elegant solution is to star expand a Struct using a select operator as shown below:
在我看来,最优雅的解决方案是使用 select 运算符对 Struct 进行星形扩展,如下所示:
var explodedDf2 = explodedDf.select("department.*","*")
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html
回答by Feynman27
This seems to work (though maybe not the most elegant solution).
这似乎有效(尽管可能不是最优雅的解决方案)。
var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))

