Java 如何展平 Spark 数据帧中的结构?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/38753898/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 20:37:04  来源:igfitidea点击:

How to flatten a struct in a Spark dataframe?

javaapache-sparkapache-spark-sql

提问by djWann

I have a dataframe with the following structure:

我有一个具有以下结构的数据框:

 |-- data: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- keyNote: struct (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |-- details: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

How it is possible to flatten the structure and create a new dataframe:

如何展平结构并创建新的数据框:

     |-- id: long (nullable = true)
     |-- keyNote: struct (nullable = true)
     |    |-- key: string (nullable = true)
     |    |-- note: string (nullable = true)
     |-- details: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)

Is there something like explode, but for structs?

有没有类似爆炸的东西,但对于结构?

采纳答案by djWann

This should work in Spark 1.6 or later:

这应该适用于 Spark 1.6 或更高版本:

df.select(df.col("data.*"))

or

或者

df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))

回答by Thomas Decaux

An easy way is to use SQL, you could build a SQL query string to alias nested column as flat ones.

一种简单的方法是使用 SQL,您可以构建一个 SQL 查询字符串,将嵌套列别名为平面列。

  • Retrieve data-frame schema (df.schema())
  • Transform schema to SQL (for (field : schema().fields()) ...
  • Query:

    val newDF = sqlContext.sql("SELECT " + sqlGenerated + " FROM source")
    
  • 检索数据框架构 ( df.schema())
  • 将模式转换为 SQL(for (field :) schema().fields()...
  • 询问:

    val newDF = sqlContext.sql("SELECT " + sqlGenerated + " FROM source")
    

Here is an example in Java.

这是Java 中的一个示例

(I prefer SQL way, so you can easily test it on Spark-shell and it's cross-language).

(我更喜欢 SQL 方式,因此您可以轻松地在 Spark-shell 上对其进行测试,并且它是跨语言的)。

回答by steco

Here is function that is doing what you want and that can deal with multiple nested columns containing columns with same name:

这是执行您想要的操作并且可以处理包含具有相同名称的列的多个嵌套列的函数:

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

Before:

前:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

After:

后:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)

回答by Aydin K.

I generalized the solution from stecos a bit more so the flattening can be done on more than two struct layers deep:

我对 stecos 的解决方案进行了更多的概括,因此可以在两个以上的结构层深度上进行展平:

def flatten_df(nested_df, layers):
    flat_cols = []
    nested_cols = []
    flat_df = []

    flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'])
    nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'])

    flat_df.append(nested_df.select(flat_cols[0] +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols[0]
                                for c in nested_df.select(nc+'.*').columns])
                  )
    for i in range(1, layers):
        print (flat_cols[i-1])
        flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct'])
        nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct'])

        flat_df.append(flat_df[i-1].select(flat_cols[i] +
                                [col(nc+'.'+c).alias(nc+'_'+c)
                                    for nc in nested_cols[i]
                                    for c in flat_df[i-1].select(nc+'.*').columns])
        )

    return flat_df[-1]

just call with:

只需致电:

my_flattened_df = flatten_df(my_df_having_nested_structs, 3)

(second parameter is the level of layers to be flattened, in my case it's 3)

(第二个参数是要展平的层级,在我的例子中是 3)

回答by federicojasson

This flatten_dfversion flattens the dataframe at every layer level, using a stack to avoid recursive calls:

flatten_df版本在每个层级将数据帧展平,使用堆栈来避免递归调用:

from pyspark.sql.functions import col


def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Example:

例子:

from pyspark.sql.types import StringType, StructField, StructType


schema = StructType([
    StructField("some", StringType()),

    StructField("nested", StructType([
        StructField("nestedchild1", StringType()),
        StructField("nestedchild2", StringType())
    ])),

    StructField("renested", StructType([
        StructField("nested", StructType([
            StructField("nestedchild1", StringType()),
            StructField("nestedchild2", StringType())
        ]))
    ]))
])

data = [
    {
        "some": "value1",
        "nested": {
            "nestedchild1": "value2",
            "nestedchild2": "value3",
        },
        "renested": {
            "nested": {
                "nestedchild1": "value4",
                "nestedchild2": "value5",
            }
        }
    }
]

df = spark.createDataFrame(data, schema)
flat_df = flatten_df(df)
print(flat_df.collect())

Prints:

印刷:

[Row(some=u'value1', renested_nested_nestedchild1=u'value4', renested_nested_nestedchild2=u'value5', nested_nestedchild1=u'value2', nested_nestedchild2=u'value3')]

回答by sri hari kali charan Tummala

below worked for me in spark sql

下面在 spark sql 中为我工作

import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions.{explode, expr, posexplode, when}

object StackOverFlowQuestion {
  def main(args: Array[String]): Unit = {

    val logger = Logger.getLogger("FlattenTest")
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val spark = SparkSession.builder()
      .appName("FlattenTest")
      .config("spark.sql.warehouse.dir", "C:\Temp\hive")
      .master("local[2]")
      //.enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    val stringTest =
      """{
                               "total_count": 123,
                               "page_size": 20,
                               "another_id": "gdbfdbfdbd",
                               "sen": [{
                                "id": 123,
                                "ses_id": 12424343,
                                "columns": {
                                    "blah": "blah",
                                    "count": 1234
                                },
                                "class": {},
                                "class_timestamps": {},
                                "sentence": "spark is good"
                               }]
                            }
                             """
    val result = List(stringTest)
    val githubRdd=spark.sparkContext.makeRDD(result)
    val gitHubDF=spark.read.json(githubRdd)
    gitHubDF.show()
    gitHubDF.printSchema()

    gitHubDF.registerTempTable("JsonTable")

   spark.sql("with cte as" +
      "(" +
      "select explode(sen) as senArray  from JsonTable" +
      "), cte_2 as" +
      "(" +
      "select senArray.ses_id,senArray.ses_id,senArray.columns.* from cte" +
      ")" +
      "select * from cte_2"
    ).show()

    spark.stop()
}

}

output:-

输出:-

+----------+---------+--------------------+-----------+
|another_id|page_size|                 sen|total_count|
+----------+---------+--------------------+-----------+
|gdbfdbfdbd|       20|[[[blah, 1234], 1...|        123|
+----------+---------+--------------------+-----------+

root
 |-- another_id: string (nullable = true)
 |-- page_size: long (nullable = true)
 |-- sen: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- columns: struct (nullable = true)
 |    |    |    |-- blah: string (nullable = true)
 |    |    |    |-- count: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sentence: string (nullable = true)
 |    |    |-- ses_id: long (nullable = true)
 |-- total_count: long (nullable = true)

+--------+--------+----+-----+
|  ses_id|  ses_id|blah|count|
+--------+--------+----+-----+
|12424343|12424343|blah| 1234|
+--------+--------+----+-----+