list pyspark collect_set 或 collect_list 与 groupby

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37580782/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-09 08:11:11  来源:igfitidea点击:

pyspark collect_set or collect_list with groupby

listgroup-bysetpysparkcollect

提问by Hanan Shteingart

How can I use collect_setor collect_liston a dataframe after groupby. for example: df.groupby('key').collect_set('values'). I get an error: AttributeError: 'GroupedData' object has no attribute 'collect_set'

如何使用collect_setcollect_list在之后的数据帧groupby。例如:df.groupby('key').collect_set('values')。我收到一个错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'

回答by Kamil Sindi

You need to use agg. Example:

您需要使用 agg。例子:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

Note in the above you have to create a HiveContext. See https://stackoverflow.com/a/35529093/690430for dealing with different Spark versions.

注意在上面你必须创建一个 HiveContext。有关处理不同 Spark 版本的信息,请参阅https://stackoverflow.com/a/35529093/690430

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

回答by Allen

If your dataframe is large, you can try using pandas udf(GROUPED_AGG)to avoid memory error. It is also much faster.

如果您的数据很大,您可以尝试使用pandas udf(GROUPED_AGG)来避免内存错误。它也快得多。

Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with groupBy().agg() and pyspark.sql.Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window. pandas udf

分组聚合 Pandas UDF 类似于 Spark 聚合函数。分组聚合 Pandas UDF 与 groupBy().agg() 和 pyspark.sql.Window 一起使用。它定义了从一个或多个 pandas.Series 到一个标量值的聚合,其中每个 pandas.Series 代表组或窗口中的一列。大熊猫udf

example:

例子:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))