scala 在apache spark数据帧中分解数组

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39434736/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:37:57  来源:igfitidea点击:

Explode array in apache spark Data Frame

scalaapache-sparkexplodespark-dataframe

提问by Artem

I am trying to flatten a schema of existing dataframe with nested fields. Structure of my dataframe is something like that:

我正在尝试使用嵌套字段展平现有数据框的模式。我的数据框的结构是这样的:

root  
|-- Id: long (nullable = true)  
|-- Type: string (nullable = true)  
|-- Uri: string (nullable = true)    
|-- Type: array (nullable = true)  
|    |-- element: string (containsNull = true)  
|-- Gender: array (nullable = true)  
|    |-- element: string (containsNull = true)

Type and gender can contain array of elements, one element or null value. I tried to use the following code:

类型和性别可以包含元素数组、一个元素或空值。我尝试使用以下代码:

var resDf = df.withColumn("FlatType", explode(df("Type")))

But as a result in a resulting data frame I loose rows for which I had null values for Type column. It means, for example, if I have 10 rows and in 7 rows type is null and in 3 type is not null, after I use explode in resulting data frame I have only three rows.

但结果是在结果数据框中,我丢失了 Type 列为空值的行。这意味着,例如,如果我有 10 行并且在 7 行中类型为空并且在 3 中类型不为空,那么在我在结果数据框中使用爆炸后,我只有三行。

How can I keep rows with null values but explode array of values?

如何保留具有空值的行但分解值数组?

I found some kind of workaround but still stuck in one place. For standard types we can do the following:

我找到了某种解决方法,但仍然停留在一个地方。对于标准类型,我们可以执行以下操作:

def customExplode(df: DataFrame, field: String, colType: String): org.apache.spark.sql.Column = {
var exploded = None: Option[org.apache.spark.sql.Column]
colType.toLowerCase() match {
  case "string" => 
    val avoidNull = udf((column: Seq[String]) =>
    if (column == null) Seq[String](null)
    else column)
    exploded = Some(explode(avoidNull(df(field))))
  case "boolean" => 
    val avoidNull = udf((xs: Seq[Boolean]) =>
    if (xs == null) Seq[Boolean]()
    else xs)
    exploded = Some(explode(avoidNull(df(field))))
  case _ => exploded = Some(explode(df(field)))
}
exploded.get

}

}

And after that just use it like this:

之后就这样使用它:

val explodedField = customExplode(resultDf, fieldName, fieldTypeMap(field))
resultDf = resultDf.withColumn(newName, explodedField)

However, I have a problem for struct type for the following type of structure:

但是,对于以下类型的结构,我对结构类型有疑问:

 |-- Address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- AddressType: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true) 
 |    |    |-- DEA: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Number: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- ExpirationDate: array (nullable = true)
 |    |    |    |    |    |-- element: timestamp (containsNull = true)
 |    |    |    |    |-- Status: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

How can we process that kind of schema when DEA is null?

当 DEA 为空时,我们如何处理这种模式?

Thank you in advance.

先感谢您。

P.S. I tried to use Lateral views but result is the same.

PS 我尝试使用横向视图,但结果是一样的。

回答by Daniel de Paula

Maybe you can try using when:

也许您可以尝试使用when

val resDf = df.withColumn("FlatType", when(df("Type").isNotNull, explode(df("Type")))

As shown in the whenfunction's documentation, the value nullis inserted for the values that do not match the conditions.

when函数文档中所示,null为与条件不匹配的值插入值。