Python Pyspark:将多个数组列拆分为行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41027315/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-20 00:22:02  来源:igfitidea点击:

Pyspark: Split multiple array columns into rows

pythonapache-sparkdataframepysparkapache-spark-sql

提问by Steve

I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.

我有一个数据框,它有一行和几列。一些列是单个值,而其他列是列表。所有列表列的长度相同。我想将每个列表列拆分为单独的行,同时保持任何非列表列不变。

Sample DF:

示例 DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

What I want:

我想要的是:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

If I only had one list column, this would be easy by just doing an explode:

如果我只有一个列表列,只需执行以下操作就很容易explode

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

However, if I try to also explodethe ccolumn, I end up with a dataframe with a length the square of what I want:

但是,如果我也尝试使用explodec列,我最终会得到一个长度为我想要的平方的数据框:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:

我想要的是 - 对于每一列,取该列中数组的第 n 个元素并将其添加到新行。我试过在数据框中的所有列中映射一个爆炸,但这似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

回答by zero323

Spark >= 2.4

火花 >= 2.4

You can replace zip_udfwith arrays_zipfunction

你可以zip_udfarrays_zip函数替换

from pyspark.sql.functions import arrays_zip, col

(df
    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

火花 < 2.4

With DataFramesand UDF:

随着DataFrames和UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)

(df
    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

With RDDs:

RDDs

(df
    .rdd
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:

由于 Python 通信开销,这两种解决方案都效率低下。如果数据大小是固定的,您可以执行以下操作:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

or even:

甚至:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)
]))

(df
    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:

与 UDF 或 RDD 相比,这应该快得多。推广以支持任意数量的列:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)
    ]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

回答by David

You'd need to use flatMap, not mapas you want to make multiple output rows out of each input row.

您需要使用flatMap, 而不是map因为您想从每个输入行中生成多个输出行。

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

回答by Ani Menon

One liner (for Spark>=2.4.0):

一个班轮(对于Spark>=2.4.0):

df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()

Import required:

需要导入:

from pyspark.sql.functions import arrays_zip

from pyspark.sql.functions import arrays_zip



Steps -

脚步 -

  1. Create a column bc which is an array_zipof columns band c
  2. Explode bcto get a struct tbc
  3. Select the required columns a, band c(all exploded as required).
  1. 创建一个列 bc,它是一个array_zipbc
  2. 爆炸bc得到一个结构体tbc
  3. 选择所需的列a,bc(全部按要求分解)。

Output:

输出:

> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+