scala 在 Spark SQL 中自动优雅地展平 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37471346/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:19:43  来源:igfitidea点击:

Automatically and Elegantly flatten DataFrame in Spark SQL

scalaapache-sparkapache-spark-sql

提问by echen

All,

全部,

Is there an elegant and accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType

是否有一种优雅且可接受的方式来展平带有嵌套列的 Spark SQL 表(Parquet) StructType

For example

例如

If my schema is:

如果我的架构是:

foo
 |_bar
 |_baz
x
y
z

How do I select it into a flattened tabular form without resorting to manually running

如何在不诉诸手动运行的情况下将其选择为扁平表格形式

df.select("foo.bar","foo.baz","x","y","z")

In other words, how do I obtain the result of the above code programmatically given just a StructTypeand a DataFrame

换句话说,我如何以编程方式获得上述代码的结果,仅给出 aStructType和 aDataFrame

回答by David Griffin

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...)statement by walking through the DataFrame.schema.

简短的回答是,没有“可接受”的方法可以做到这一点,但是您可以使用递归函数非常优雅地完成它,该函数select(...)通过遍历DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column]to its own Array[Column].

递归函数应该返回一个Array[Column]. 每次函数遇到 a 时StructType,它都会调用自己并将返回的值附加Array[Column]到它自己的Array[Column].

Something like:

就像是:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

You would then use it like this:

然后你会像这样使用它:

df.select(flattenSchema(df.schema):_*)

回答by V. Samma

I am improving my previous answer and offering a solution to my own problem stated in the comments of the accepted answer.

我正在改进我以前的答案,并为已接受答案的评论中所述的我自己的问题提供解决方案。

This accepted solution creates an array of Column objects and uses it to select these columns. In Spark, if you have a nested DataFrame, you can select the child column like this: df.select("Parent.Child")and this returns a DataFrame with the values of the child column and is named Child. But if you have identical names for attributes of different parent structures, you lose the info about the parent and may end up with identical column names and cannot access them by name anymore as they are unambiguous.

这个被接受的解决方案创建了一个 Column 对象的数组,并使用它来选择这些列。在 Spark 中,如果你有一个嵌套的 DataFrame,你可以像这样选择子列:df.select("Parent.Child")这将返回一个带有子列值的 DataFrame 并命名为Child。但是,如果不同父结构的属性具有相同的名称,则会丢失有关父结构的信息,最终可能会得到相同的列名,并且无法再按名称访问它们,因为它们是明确的。

This was my problem.

这是我的问题。

I found a solution to my problem, maybe it can help someone else as well. I called the flattenSchemaseparately:

我找到了解决我的问题的方法,也许它也可以帮助其他人。我分别打电话给flattenSchema

val flattenedSchema = flattenSchema(df.schema)

and this returned an Array of Column objects. Instead of using this in the select(), which would return a DataFrame with columns named by the child of the last level, I mapped the original column names to themselves as strings, then after selecting Parent.Childcolumn, it renames it as Parent.Childinstead of Child(I also replaced dots with underscores for my convenience):

这返回了一个列对象数组。相反,在使用这个的select(),这将返回由最后一个等级的子命名列的数据帧,我映射到的原始列名以自己为字符串,然后选择后Parent.Child列,但将其重命名为Parent.Child而不是Child(我还更换点为方便起见,下划线):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

And then you can use the select function as shown in the original answer:

然后您可以使用原始答案中所示的选择功能:

var newDf = df.select(renamedCols:_*)

回答by Evan V

Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

只是想分享我的 Pyspark 解决方案 - 它或多或少是@David Griffin 解决方案的翻译,因此它支持任何级别的嵌套对象。

from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields


df.select(flatten(df.schema)).show()

回答by Averell

========== edit ====

==========编辑====

There's some additional handling for more complex schemas here: https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

这里对更复杂的模式有一些额外的处理:https: //medium.com/@lvhuyen/working-with-spark-dataframe-have-a-complex-schema-a3bce8c3f44

==================

==================

PySpark, added to @Evan V's answer, when your field-names have special characters, like a dot '.', a hyphen '-', ...:

PySpark,添加到@Evan V 的答案中,当您的字段名称具有特殊字符时,例如点“.”、连字符“-”、...:

from pyspark.sql.types import StructType, ArrayType  

def normalise_field(raw):
    return raw.strip().lower() \
            .replace('`', '') \
            .replace('-', '_') \
            .replace(' ', '_') \
            .strip('_')

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType
        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(col(name).alias(normalise_field(name)))

    return fields

df.select(flatten(df.schema)).show()

回答by Thomas Decaux

You could also use SQL to select columns as flat.

您还可以使用 SQL 将列选择为平面。

  1. Get original data-frame schema
  2. Generate SQL string, by browsing schema
  3. Query your original data-frame
  1. 获取原始数据框架构
  2. 通过浏览模式生成 SQL 字符串
  3. 查询您的原始数据框

I did an implementation in Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

我用 Java 做了一个实现:https: //gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(use recursive method as well, I prefer SQL way, so you can test it easily via Spark-shell).

(也使用递归方法,我更喜欢 SQL 方式,因此您可以通过 Spark-shell 轻松测试它)。

回答by steco

Here is a function that is doing what you want and that can deal with multiple nested columns containing columns with same name, with a prefix:

这是一个函数,它可以执行您想要的操作,并且可以处理包含同名列的多个嵌套列,并带有前缀:

from pyspark.sql import functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

Before:

前:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)
 |-- bar: struct (nullable = true)
 |    |-- a: float (nullable = true)
 |    |-- b: float (nullable = true)
 |    |-- c: integer (nullable = true)

After:

后:

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- foo_a: float (nullable = true)
 |-- foo_b: float (nullable = true)
 |-- foo_c: integer (nullable = true)
 |-- bar_a: float (nullable = true)
 |-- bar_b: float (nullable = true)
 |-- bar_c: integer (nullable = true)

回答by Powers

I added a DataFrame#flattenSchemamethod to the open source spark-daria project.

DataFrame#flattenSchema在开源spark-daria 项目中添加了一个方法。

Here's how you can use the function with your code.

以下是如何在代码中使用该函数。

import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()

+-------+-------+---------+----+---+
|foo.bar|foo.baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

You can also specify different column name delimiters with the flattenSchema()method.

您还可以使用该flattenSchema()方法指定不同的列名分隔符。

df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz|        x|   y|  z|
+-------+-------+---------+----+---+
|   this|     is|something|cool| ;)|
+-------+-------+---------+----+---+

This delimiter parameter is surprisingly important. If you're flattening your schema to load the table in Redshift, you won't be able to use periods as the delimiter.

这个定界符参数非常重要。如果您要展平架构以在 Redshift 中加载表,则不能使用句点作为分隔符。

Here's the full code snippet to generate this output.

这是生成此输出的完整代码片段。

val data = Seq(
  Row(Row("this", "is"), "something", "cool", ";)")
)

val schema = StructType(
  Seq(
    StructField(
      "foo",
      StructType(
        Seq(
          StructField("bar", StringType, true),
          StructField("baz", StringType, true)
        )
      ),
      true
    ),
    StructField("x", StringType, true),
    StructField("y", StringType, true),
    StructField("z", StringType, true)
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

df.flattenSchema().show()

The underlying code is similar to David Griffin's code (in case you don't want to add the spark-daria dependency to your project).

底层代码类似于 David Griffin 的代码(以防您不想将 spark-daria 依赖项添加到您的项目中)。

object StructTypeHelpers {

  def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
    schema.fields.flatMap(structField => {
      val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
      val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name

      structField.dataType match {
        case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
        case _ => Array(col(codeColName).alias(colName))
      }
    })
  }

}

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
      df.select(
        StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
      )
    }

  }

}

回答by swdev

To combine David Griffen and V. Samma answers, you could just do this to flatten while avoiding duplicate column names:

结合 David Griffen 和 V. Samma 的答案,你可以这样做来展平,同时避免重复的列名:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName).as(colName.replace(".","_")))
    }
  })
}

def flattenDataFrame(df:DataFrame): DataFrame = {
    df.select(flattenSchema(df.schema):_*)
}

var my_flattened_json_table = flattenDataFrame(my_json_table)

回答by Babatunde Adekunle

A little addition to the code above, if you are working with Nested Struct and Array.

如果您正在使用嵌套结构和数组,则对上面的代码进行一点补充。

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f match {
        case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
        case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
        case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
        case _ => Array(col(colName))
      }
    })
  }

回答by Ishan Kumar

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ListBuffer 
val columns=new ListBuffer[String]()

def flattenSchema(schema:StructType,prefix:String=null){
for(i<-schema.fields){
  if(i.dataType.isInstanceOf[StructType]) {
    val columnPrefix = i.name + "."
    flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix)
  }
  else {
    if(prefix == null)
      columns.+=(i.name)
    else
      columns.+=(prefix+i.name)
  }
  }
}