scala 如何从嵌套的结构元素数组创建 Spark DataFrame?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33864389/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:48:35  来源:igfitidea点击:

How can I create a Spark DataFrame from a nested array of struct element?

scalaapache-sparkdataframeapache-spark-sql

提问by zapstar

I have read a JSON file into Spark. This file has the following structure:

我已将 JSON 文件读入 Spark。该文件具有以下结构:

scala> tweetBlob.printSchema
root
 |-- related: struct (nullable = true)
 |    |-- next: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- search: struct (nullable = true)
 |    |-- current: long (nullable = true)
 |    |-- results: long (nullable = true)
 |-- tweets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cde: struct (nullable = true)
...
...
 |    |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |    |-- message: struct (nullable = true)
...
...

What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below

理想情况下,我想要的是一个带有“cde”、“cdeInternal”、“message”列的 DataFrame ,如下所示

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...

I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"

我设法使用“explode”将“tweets”数组中的元素提取到名为“tweets”的列中

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
 |-- tweets: struct (nullable = true)
 |    |-- cde: struct (nullable = true)
...
...
 |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |-- message: struct (nullable = true)
...
...

How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.

如何选择结构中的所有列并从中创建一个 DataFrame?如果我的理解是正确的,Explode 对结构不起作用。

Any help is appreciated.

任何帮助表示赞赏。

回答by zero323

One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:

处理此问题的一种可能方法是从模式中提取所需的信息。让我们从一些虚拟数据开始:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int, y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

and a helper function:

和一个辅助函数:

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

Finally the results:

最终结果:

df.select(children("bar", df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)

回答by Fernando Lemos

You can use

您可以使用

df
  .select(explode(col("path_to_collection")).as("collection"))
  .select(col("collection.*"))`:

Example:

例子:

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""

scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*"))

scala> inline.printSchema
root
 |-- sname: string (nullable = true)
 |-- year: long (nullable = true)

scala> inline.show
+--------+----+
|   sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+

Or, you can also use SQL function inline:

或者,您也可以使用 SQL 函数inline

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""

scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp")

scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp")

scala> inline.printSchema
root
 |-- sname: string (nullable = true)
 |-- year: long (nullable = true)

scala> inline.show
+--------+----+
|   sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+

回答by John

scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> case class Bar(x: Int, y: String)
defined class Bar

scala> case class Foo(bar: Bar)
defined class Foo

scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>]


scala> df.printSchema
root
 |-- bar: struct (nullable = true)
 |    |-- x: integer (nullable = false)
 |    |-- y: string (nullable = true)


scala> df.select("bar.*").printSchema
root
 |-- x: integer (nullable = true)
 |-- y: string (nullable = true)


scala>