java Spark sql如何在不丢失空值的情况下爆炸

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39739072/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 04:36:17  来源:igfitidea点击:

Spark sql how to explode without losing null values

javaapache-sparknullapache-spark-sql

提问by alexgbelov

I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

我有一个数据框,我想把它弄平。作为过程的一部分,我想分解它,所以如果我有一列数组,数组的每个值都将用于创建一个单独的行。例如,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

should become

应该成为

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

This is my code

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

问题是在我的数据中,一些数组列有空值。在这种情况下,整行都将被删除。所以这个数据框:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

becomes

变成

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

instead of

代替

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

How can I explode my arrays so that I don't lose the null rows?

我怎样才能分解我的数组,这样我就不会丢失空行?

I am using Spark 1.5.2 and Java 8

我使用的是 Spark 1.5.2 和 Java 8

回答by zero323

Spark 2.2+

火花 2.2+

You can use explode_outerfunction:

您可以使用explode_outer功能:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark <= 2.1

火花 <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

在 Scala 中,Java 等效项应该几乎相同(导入单个函数使用import static)。

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

The idea here is basically to replace NULLwith an array(NULL)of a desired type. For complex type (a.k.a structs) you have to provide full schema:

这里的想法是基本取代NULLarray(NULL)希望的类型。对于复杂类型(又名structs),您必须提供完整的架构:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

or

或者

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Note:

注意

If array Columnhas been created with containsNullset to falseyou should change this first (tested with Spark 2.1):

如果数组Column是用containsNullset创建的,false你应该先改变它(用 Spark 2.1 测试):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))

回答by TopGuys

You can use explode_outer()function.

您可以使用explode_outer()功能。

回答by nsanglar

Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).

跟进已接受的答案,当数组元素是复杂类型时,可能很难手动定义它(例如,使用大型结构)。

To do it automatically I wrote the following helper method:

为了自动完成,我编写了以下辅助方法:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
      val arrayFields = df.schema.fields
          .map(field => field.name -> field.dataType)
          .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
          .toMap

      columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
 }

Edit: it seems that spark 2.2 and newer have this built in.

编辑:似乎 spark 2.2 和更新版本内置了这个。

回答by Mohana B C

To handle empty map type column: for Spark <= 2.1

处理空地图类型列:对于 Spark <= 2.1

 List((1, Array(2, 3, 4), Map(1 -> "a")),
(2, Array(5, 6, 7), Map(2 -> "b")),
(3, Array[Int](), Map[Int, String]())).toDF("col1", "col2", "col3").show()


 df.select('col1, explode(when(size(map_keys('col3)) === 0, map(lit("null"), lit("null"))).
otherwise('col3))).show()