scala 如何从嵌套的结构元素数组创建 Spark DataFrame?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33864389/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I create a Spark DataFrame from a nested array of struct element?
提问by zapstar
I have read a JSON file into Spark. This file has the following structure:
我已将 JSON 文件读入 Spark。该文件具有以下结构:
scala> tweetBlob.printSchema
root
|-- related: struct (nullable = true)
| |-- next: struct (nullable = true)
| | |-- href: string (nullable = true)
|-- search: struct (nullable = true)
| |-- current: long (nullable = true)
| |-- results: long (nullable = true)
|-- tweets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cde: struct (nullable = true)
...
...
| | |-- cdeInternal: struct (nullable = true)
...
...
| | |-- message: struct (nullable = true)
...
...
What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below
理想情况下,我想要的是一个带有“cde”、“cdeInternal”、“message”列的 DataFrame ,如下所示
root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...
I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"
我设法使用“explode”将“tweets”数组中的元素提取到名为“tweets”的列中
scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
|-- tweets: struct (nullable = true)
| |-- cde: struct (nullable = true)
...
...
| |-- cdeInternal: struct (nullable = true)
...
...
| |-- message: struct (nullable = true)
...
...
How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.
如何选择结构中的所有列并从中创建一个 DataFrame?如果我的理解是正确的,Explode 对结构不起作用。
Any help is appreciated.
任何帮助表示赞赏。
回答by zero323
One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:
处理此问题的一种可能方法是从模式中提取所需的信息。让我们从一些虚拟数据开始:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
case class Bar(x: Int, y: String)
case class Foo(bar: Bar)
val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df.printSchema
// root
// |-- bar: struct (nullable = true)
// | |-- x: integer (nullable = false)
// | |-- y: string (nullable = true)
and a helper function:
和一个辅助函数:
def children(colname: String, df: DataFrame) = {
val parent = df.schema.fields.filter(_.name == colname).head
val fields = parent.dataType match {
case x: StructType => x.fields
case _ => Array.empty[StructField]
}
fields.map(x => col(s"$colname.${x.name}"))
}
Finally the results:
最终结果:
df.select(children("bar", df): _*).printSchema
// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
回答by Fernando Lemos
You can use
您可以使用
df
.select(explode(col("path_to_collection")).as("collection"))
.select(col("collection.*"))`:
Example:
例子:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*"))
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
Or, you can also use SQL function inline:
或者,您也可以使用 SQL 函数inline:
scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""
scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp")
scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp")
scala> inline.printSchema
root
|-- sname: string (nullable = true)
|-- year: long (nullable = true)
scala> inline.show
+--------+----+
| sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+
回答by John
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> case class Bar(x: Int, y: String)
defined class Bar
scala> case class Foo(bar: Bar)
defined class Foo
scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>]
scala> df.printSchema
root
|-- bar: struct (nullable = true)
| |-- x: integer (nullable = false)
| |-- y: string (nullable = true)
scala> df.select("bar.*").printSchema
root
|-- x: integer (nullable = true)
|-- y: string (nullable = true)
scala>

