Python GroupBy 列并过滤 Pyspark 中具有最大值的行

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/48829993/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:52:38  来源:igfitidea点击:

GroupBy column and filter rows with maximum value in Pyspark

pythonapache-sparkpysparkapache-spark-sql

提问by Thomas

I am almost certain this has been asked before, but a search through stackoverflowdid not answer my question. Not a duplicate of [2]since I want the maximum value, not the most frequent item. I am new to pyspark and trying to do something really simple: I want to groupBy column "A" and then only keep the row of each group that has the maximum value in column "B". Like this:

我几乎可以肯定以前有人问过这个问题,但是通过 stackoverflow 搜索并没有回答我的问题。不是[2]的副本,因为我想要最大值,而不是最频繁的项目。我是 pyspark 的新手,并试图做一些非常简单的事情:我想对“A”列进行分组,然后只保留“B”列中具有最大值的每个组的行。像这样:

df_cleaned = df.groupBy("A").agg(F.max("B"))

Unfortunately, this throws away all other columns - df_cleaned only contains the columns "A" and the max value of B. How do I instead keep the rows? ("A", "B", "C"...)

不幸的是,这会丢弃所有其他列 - df_cleaned 只包含列“A”和 B 的最大值。我该如何保留行?(“A”、“B”、“C”...)

回答by pault

You can do this without a udfusing a Window.

您可以在不udf使用的情况下执行此操作Window

Consider the following example:

考虑以下示例:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
    ('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  5|
#|  a|  8|
#|  a|  7|
#|  b|  1|
#|  b|  3|
#+---+---+

Create a Windowto partition by column Aand use this to compute the maximum of each group. Then filter out the rows such that the value in column Bis equal to the max.

创建一个Window按列分区A并使用它来计算每个组的最大值。然后过滤掉行,使列中的值B等于最大值。

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
    .where(f.col('B') == f.col('maxB'))\
    .drop('maxB')\
    .show()
#+---+---+
#|  A|  B|
#+---+---+
#|  a|  8|
#|  b|  3|
#+---+---+

Or equivalently using pyspark-sql:

或等效地使用pyspark-sql

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#|  A|  B|
#+---+---+
#|  b|  3|
#|  a|  8|
#+---+---+

回答by ndricca

Another possible approach is to apply join the dataframe with itself specifying "leftsemi". This kind of join includes all columns from the dataframe on the left side and no columns on the right side.

另一种可能的方法是将数据框与自身指定“leftsemi”应用连接。这种连接包括左侧数据框中的所有列,右侧没有列。

For example:

例如:

import pyspark.sql.functions as f
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+

Max value of column B by by column A can be selected doing:

可以通过 A 列选择 B 列的最大值,执行以下操作:

df.groupBy('A').agg(f.max('B')
+---+---+
|  A|  B|
+---+---+
|  a|  8|
|  b|  3|
+---+---+

Using this expression as a right side in a left semi join, and renaming the obtained column max(B)back to its original name B, we can obtain the result needed:

将此表达式用作左半连接中的右侧,并将获得的列重命名max(B)回其原始名称B,我们可以获得所需的结果:

df.join(df.groupBy('A').agg(f.max('B').alias('B')),on='B',how='leftsemi').show()
+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+

The physical plan behind this solution and the one from accepted answer are different and it is still not clear to me which one will perform better on large dataframes.

此解决方案背后的物理计划与公认答案中的物理计划不同,我仍然不清楚哪一个在大型数据帧上表现更好。

The same result can be obtained using spark SQL syntax doing:

使用 spark SQL 语法可以得到相同的结果:

df.registerTempTable('table')
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''
sqlContext.sql(q).show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+