在 PySpark 中的 GroupedData 上应用 UDF(带有运行的 Python 示例)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40006395/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Applying UDFs on GroupedData in PySpark (with functioning python example)
提问by arosner09
I have this python code that runs locally in a pandas dataframe:
我有这个在熊猫数据帧中本地运行的python代码:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object.
我想在 PySpark 中运行它,但在处理 pyspark.sql.group.GroupedData 对象时遇到问题。
I've tried the following:
我尝试了以下方法:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
which returns
返回
KeyError: 'A'
I presume because 'A' is no longer a column and I can't find the equivalent for x.name.
我认为是因为“A”不再是一列,而且我找不到 x.name 的等效项。
And then
进而
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
but get the following error:
但得到以下错误:
AttributeError: 'GroupedData' object has no attribute 'map'
Any suggestions would be really appreciated!
任何建议将不胜感激!
回答by Ryan Widmaier
What you are trying to is write a UDAF (User Defined Aggregate Function) as opposed to a UDF (User Defined Function). UDAFs are functions that work on data grouped by a key. Specifically they need to define how to merge multiple values in the group in a single partition, and then how to merge the results across partitions for key. There is currently no way in python to implement a UDAF, they can only be implemented in Scala.
您要尝试的是编写 UDAF(用户定义的聚合函数)而不是 UDF(用户定义的函数)。UDAF 是处理按键分组的数据的函数。具体来说,他们需要定义如何在单个分区中合并组中的多个值,然后如何为 key 跨分区合并结果。目前在 python 中没有办法实现 UDAF,它们只能在 Scala 中实现。
But, you can work around it in Python. You can use collect set to gather your grouped values and then use a regular UDF to do what you want with them. The only caveat is collect_set only works on primitive values, so you will need to encode them down to a string.
但是,您可以在 Python 中解决它。您可以使用 collect set 来收集您的分组值,然后使用常规 UDF 对它们执行您想要的操作。唯一的警告是 collect_set 仅适用于原始值,因此您需要将它们编码为字符串。
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf
def myFunc(data_list):
for val in data_list:
b, c = data.split(',')
# do something
return <whatever>
myUdf = udf(myFunc, StringType())
df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
.groupBy('A').agg(collect_list('data').alias('data'))
.withColumn('data', myUdf('data'))
Use collect_set if you want deduping. Also, if you have lots of values for some of your keys, this will be slow because all values for a key will need to be collected in a single partition somewhere on your cluster. If your end result is a value you build by combining the values per key in some way (for example summing them) it might be faster to implement it using the RDD aggregateByKeymethod which lets you build an intermediate value for each key in a partition before shuffling data around.
如果您想要重复数据删除,请使用 collect_set。此外,如果您的某些键有很多值,这将很慢,因为需要将一个键的所有值收集在集群上某个位置的单个分区中。如果您的最终结果是您通过以某种方式组合每个键的值(例如对它们求和)来构建的值,那么使用RDD 聚合ByKey方法实现它可能会更快,该方法允许您在之前为分区中的每个键构建一个中间值打乱数据。
EDIT: 11/21/2018
编辑:11/21/2018
Since this answer was written, pyspark added support for UDAF'S using Pandas. There are some nice performance improvements when using the Panda's UDFs and UDAFs over straight python functions with RDDs. Under the hood it vectorizes the columns (batches the values from multiple rows together to optimize processing and compression). Take a look at herefor a better explanation or look at user6910411's answer below for an example.
由于写了这个答案,pyspark 使用 Pandas 添加了对 UDAF 的支持。当使用 Panda 的 UDF 和 UDAF 而不是带有 RDD 的直接 python 函数时,有一些很好的性能改进。在引擎盖下,它对列进行矢量化(将多行中的值批处理在一起以优化处理和压缩)。请查看此处以获得更好的解释,或者查看下面的user6910411的答案以获取示例。
回答by zero323
Since Spark 2.3 you can use pandas_udf
. GROUPED_MAP
takes Callable[[pandas.DataFrame], pandas.DataFrame]
or in other words a function which maps from Pandas DataFrame
of the same shape as the input, to the output DataFrame
.
从 Spark 2.3 开始,您可以使用pandas_udf
. GROUPED_MAP
采用Callable[[pandas.DataFrame], pandas.DataFrame]
或者换句话说一个函数,它从DataFrame
与输入相同形状的Pandas 映射到输出DataFrame
。
For example if data looks like this:
例如,如果数据如下所示:
df = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
and you want to compute average value of pairwise min between value1
value2
, you have to define output schema:
并且您想计算 之间成对最小值的平均值value1
value2
,您必须定义输出模式:
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
pandas_udf
:
pandas_udf
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
result = pd.DataFrame(df.groupby(df.key).apply(
lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
))
result.reset_index(inplace=True, drop=False)
return result
and apply it:
并应用它:
df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Excluding schema definition and decorator, your current Pandas code can be applied as-is.
不包括模式定义和装饰器,您当前的 Pandas 代码可以按原样应用。
Since Spark 2.4.0 there is also GROUPED_AGG
variant, which takes Callable[[pandas.Series, ...], T]
, where T
is a primitive scalar:
从 Spark 2.4.0 开始,还有一个GROUPED_AGG
变体,它采用Callable[[pandas.Series, ...], T]
,其中T
是一个原始标量:
import numpy as np
@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
return np.minimum(x, y).mean()
which can be used with standard group_by
/ agg
construct:
可以与标准group_by
/agg
构造一起使用:
df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Please note that neither GROUPED_MAP
nor GROUPPED_AGG
pandas_udf
behave the same way as UserDefinedAggregateFunction
or Aggregator
, and it is closer to groupByKey
or window functions with unbounded frame. Data is shuffled first, and only after that, UDF is applied.
请注意,GROUPED_MAP
或 的GROUPPED_AGG
pandas_udf
行为方式与UserDefinedAggregateFunction
或都不相同Aggregator
,并且更接近groupByKey
具有无界框架的 或 窗口函数。首先对数据进行混洗,然后才应用 UDF。
For optimized execution you should implement Scala UserDefinedAggregateFunction
and add Python wrapper.
为了优化执行,您应该实现 ScalaUserDefinedAggregateFunction
并添加 Python wrapper。
See also User defined function to be applied to Window in PySpark?
回答by Mayur Dangar
I am going to extend above answer.
我将扩展上述答案。
So you can implement same logic like pandas.groupby().apply in pyspark using @pandas_udf and which is vectorization method and faster then simple udf.
因此,您可以使用 @pandas_udf 在 pyspark 中实现与 pandas.groupby().apply 相同的逻辑,这是矢量化方法,比简单的 udf 更快。
from pyspark.sql.functions import pandas_udf,PandasUDFType
df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
df3.groupby("key").apply(g).show()
You will get below result:
你会得到以下结果:
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
| b| 6.5| -1.5| 5.0| 8.0|
| a| 0.0| 21.0| 21.0| -21.0|
+---+----------+----------+-------+-------+
So , You can do more calculation between other fields in grouped data.and add them into dataframe in list format.
因此,您可以在分组数据中的其他字段之间进行更多计算,并将它们以列表格式添加到数据框中。