pandas 如何在 Spark SQL 中压缩两个数组列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/54282706/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 06:16:52  来源:igfitidea点击:

How to zip two array columns in Spark SQL

pythonpandasapache-sparkpysparkapache-spark-sql

提问by Falconic

I have a Pandas dataframe. I have tried to join two columns containing string values into a list first and then using zip, I joined each element of the list with '_'. My data set is like below:

我有一个 Pandas 数据框。我试图首先将包含字符串值的两列连接到一个列表中,然后使用 zip,我用“_”连接了列表的每个元素。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

I wanted to join these two columns in a third column like below for each row of my dataframe.

我想将这两列加入第三列中,如下所示,用于我的数据框的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

I have successfully done so in python using the code below but the dataframe is quite large and it takes a very long time to run it for the whole dataframe. I want to do the same thing in PySpark for efficiency. I have read the data in spark dataframe successfully but I'm having a hard time determining how to replicate Pandas functions with PySpark equivalent functions. How can I get my desired result in PySpark?

我已经使用下面的代码在 python 中成功地做到了这一点,但数据帧非常大,需要很长时间才能为整个数据帧运行它。我想在 PySpark 中做同样的事情以提高效率。我已经成功读取了 spark 数据帧中的数据,但是我很难确定如何使用 PySpark 等效函数复制 Pandas 函数。如何在 PySpark 中获得我想要的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

I have converted the two columns to arrays in PySpark by using the below code

我已使用以下代码将两列转换为 PySpark 中的数组

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Now all I need is to zip each element of the arrays in the two columns using '_'. How can I use zip with this? Any help is appreciated.

现在我只需要使用“_”压缩两列中数组的每个元素。我该如何使用 zip 呢?任何帮助表示赞赏。

回答by 10465355 says Reinstate Monica

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

相当于 Python 的 Spark SQL 将是pyspark.sql.functions.arrays_zip

pyspark.sql.functions.arrays_zip(*cols)

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

pyspark.sql.functions.arrays_zip(*cols)

集合函数:返回一个合并的结构数组,其中第 N 个结构包含输入数组的所有第 N 个值。

So if you already have two arrays:

因此,如果您已经有两个数组:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

You can just apply it on the result

您可以将其应用于结果

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Now to combine the results you can transform(How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

现在结合您可以的结果transform如何使用变换高阶函数?,类型错误:列不可迭代 - 如何迭代 ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Note:

注意

Higher order functions transformand arrays_ziphas been introduced in Apache Spark 2.4.

高阶函数transformarrays_zip已在 Apache Spark 2.4 中引入。

回答by Suresh

You can also UDF to zip the split array columns,

您还可以使用 UDF 来压缩拆分的数组列,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+

回答by blackbishop

For Spark 2.4+, this can be done using only zip_withfunction to zip a concatenate on the same time:

对于 Spark 2.4+,这可以使用 onlyzip_with函数同时压缩连接来完成:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

The higher-order function takes 2 arrays to merge, element-wise, using a lambda function (x, y) -> concat(x, '_', y).

高阶函数使用 lambda 函数按元素合并 2 个数组(x, y) -> concat(x, '_', y)