Java 如何展平 Spark 数据帧中的结构?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38753898/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to flatten a struct in a Spark dataframe?
提问by djWann
I have a dataframe with the following structure:
我有一个具有以下结构的数据框:
|-- data: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- keyNote: struct (nullable = true)
| | |-- key: string (nullable = true)
| | |-- note: string (nullable = true)
| |-- details: map (nullable = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
How it is possible to flatten the structure and create a new dataframe:
如何展平结构并创建新的数据框:
|-- id: long (nullable = true)
|-- keyNote: struct (nullable = true)
| |-- key: string (nullable = true)
| |-- note: string (nullable = true)
|-- details: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Is there something like explode, but for structs?
有没有类似爆炸的东西,但对于结构?
采纳答案by djWann
This should work in Spark 1.6 or later:
这应该适用于 Spark 1.6 或更高版本:
df.select(df.col("data.*"))
or
或者
df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))
回答by Thomas Decaux
An easy way is to use SQL, you could build a SQL query string to alias nested column as flat ones.
一种简单的方法是使用 SQL,您可以构建一个 SQL 查询字符串,将嵌套列别名为平面列。
- Retrieve data-frame schema (
df.schema()
) - Transform schema to SQL
(for (field :
schema().fields()
) ... Query:
val newDF = sqlContext.sql("SELECT " + sqlGenerated + " FROM source")
- 检索数据框架构 (
df.schema()
) - 将模式转换为 SQL(for (field :)
schema().fields()
... 询问:
val newDF = sqlContext.sql("SELECT " + sqlGenerated + " FROM source")
Here is an example in Java.
这是Java 中的一个示例。
(I prefer SQL way, so you can easily test it on Spark-shell and it's cross-language).
(我更喜欢 SQL 方式,因此您可以轻松地在 Spark-shell 上对其进行测试,并且它是跨语言的)。
回答by steco
Here is function that is doing what you want and that can deal with multiple nested columns containing columns with same name:
这是执行您想要的操作并且可以处理包含具有相同名称的列的多个嵌套列的函数:
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])
return flat_df
Before:
前:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
|-- bar: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
After:
后:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo_a: float (nullable = true)
|-- foo_b: float (nullable = true)
|-- foo_c: integer (nullable = true)
|-- bar_a: float (nullable = true)
|-- bar_b: float (nullable = true)
|-- bar_c: integer (nullable = true)
回答by Aydin K.
I generalized the solution from stecos a bit more so the flattening can be done on more than two struct layers deep:
我对 stecos 的解决方案进行了更多的概括,因此可以在两个以上的结构层深度上进行展平:
def flatten_df(nested_df, layers):
flat_cols = []
nested_cols = []
flat_df = []
flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'])
nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'])
flat_df.append(nested_df.select(flat_cols[0] +
[col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols[0]
for c in nested_df.select(nc+'.*').columns])
)
for i in range(1, layers):
print (flat_cols[i-1])
flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct'])
nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct'])
flat_df.append(flat_df[i-1].select(flat_cols[i] +
[col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols[i]
for c in flat_df[i-1].select(nc+'.*').columns])
)
return flat_df[-1]
just call with:
只需致电:
my_flattened_df = flatten_df(my_df_having_nested_structs, 3)
(second parameter is the level of layers to be flattened, in my case it's 3)
(第二个参数是要展平的层级,在我的例子中是 3)
回答by federicojasson
This flatten_df
version flattens the dataframe at every layer level, using a stack to avoid recursive calls:
此flatten_df
版本在每个层级将数据帧展平,使用堆栈来避免递归调用:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Example:
例子:
from pyspark.sql.types import StringType, StructField, StructType
schema = StructType([
StructField("some", StringType()),
StructField("nested", StructType([
StructField("nestedchild1", StringType()),
StructField("nestedchild2", StringType())
])),
StructField("renested", StructType([
StructField("nested", StructType([
StructField("nestedchild1", StringType()),
StructField("nestedchild2", StringType())
]))
]))
])
data = [
{
"some": "value1",
"nested": {
"nestedchild1": "value2",
"nestedchild2": "value3",
},
"renested": {
"nested": {
"nestedchild1": "value4",
"nestedchild2": "value5",
}
}
}
]
df = spark.createDataFrame(data, schema)
flat_df = flatten_df(df)
print(flat_df.collect())
Prints:
印刷:
[Row(some=u'value1', renested_nested_nestedchild1=u'value4', renested_nested_nestedchild2=u'value5', nested_nestedchild1=u'value2', nested_nestedchild2=u'value3')]
回答by sri hari kali charan Tummala
below worked for me in spark sql
下面在 spark sql 中为我工作
import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.{explode, expr, posexplode, when}
object StackOverFlowQuestion {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("FlattenTest")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("FlattenTest")
.config("spark.sql.warehouse.dir", "C:\Temp\hive")
.master("local[2]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val stringTest =
"""{
"total_count": 123,
"page_size": 20,
"another_id": "gdbfdbfdbd",
"sen": [{
"id": 123,
"ses_id": 12424343,
"columns": {
"blah": "blah",
"count": 1234
},
"class": {},
"class_timestamps": {},
"sentence": "spark is good"
}]
}
"""
val result = List(stringTest)
val githubRdd=spark.sparkContext.makeRDD(result)
val gitHubDF=spark.read.json(githubRdd)
gitHubDF.show()
gitHubDF.printSchema()
gitHubDF.registerTempTable("JsonTable")
spark.sql("with cte as" +
"(" +
"select explode(sen) as senArray from JsonTable" +
"), cte_2 as" +
"(" +
"select senArray.ses_id,senArray.ses_id,senArray.columns.* from cte" +
")" +
"select * from cte_2"
).show()
spark.stop()
}
}
output:-
输出:-
+----------+---------+--------------------+-----------+
|another_id|page_size| sen|total_count|
+----------+---------+--------------------+-----------+
|gdbfdbfdbd| 20|[[[blah, 1234], 1...| 123|
+----------+---------+--------------------+-----------+
root
|-- another_id: string (nullable = true)
|-- page_size: long (nullable = true)
|-- sen: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- columns: struct (nullable = true)
| | | |-- blah: string (nullable = true)
| | | |-- count: long (nullable = true)
| | |-- id: long (nullable = true)
| | |-- sentence: string (nullable = true)
| | |-- ses_id: long (nullable = true)
|-- total_count: long (nullable = true)
+--------+--------+----+-----+
| ses_id| ses_id|blah|count|
+--------+--------+----+-----+
|12424343|12424343|blah| 1234|
+--------+--------+----+-----+