Python 比较 Pyspark 中的列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37673414/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Comparing columns in Pyspark
提问by Hemant
I am working on a PySpark DataFrame with n columns. I have a set of m columns (m < n) and my task is choose the column with max values in it.
我正在处理具有 n 列的 PySpark DataFrame。我有一组 m 列(m < n),我的任务是选择其中具有最大值的列。
For example:
例如:
Input: PySpark DataFrame containing :
输入:PySpark DataFrame 包含:
col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]
col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5]
Ouput :
输出:
col_4 = max(col1, col_2, col_3) = [3,2,5]
col_4 = max(col1, col_2, col_3) = [3,2,5]
There is something similar in pandas as explained in thisquestion.
如thisquestion中所述,熊猫中有类似的东西。
Is there any way of doing this in PySpark or should I change convert my PySpark df to Pandas df and then perform the operations?
在 PySpark 中有什么方法可以做到这一点,还是我应该更改将 PySpark df 转换为 Pandas df 然后执行操作?
回答by zero323
You can reduce using SQL expressions over a list of columns:
您可以减少对列列表使用 SQL 表达式:
from pyspark.sql.functions import max as max_, col, when
from functools import reduce
def row_max(*cols):
return reduce(
lambda x, y: when(x > y, x).otherwise(y),
[col(c) if isinstance(c, str) else c for c in cols]
)
df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
.toDF(["a", "b", "c"]))
df.select(row_max("a", "b", "c").alias("max")))
Spark 1.5+ also provides least
, greatest
Spark 1.5+ 还提供了least
,greatest
from pyspark.sql.functions import greatest
df.select(greatest("a", "b", "c"))
If you want to keep name of the max you can use `structs:
如果你想保留最大值的名称,你可以使用`structs:
from pyspark.sql.functions import struct, lit
def row_max_with_name(*cols):
cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))
maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))
And finally you can use above to find select "top" column:
最后你可以使用上面找到选择“顶部”列:
from pyspark.sql.functions import max
((_, c), ) = (maxs
.groupBy(col("maxs")["col"].alias("col"))
.count()
.agg(max(struct(col("count"), col("col"))))
.first())
df.select(c)
回答by mattexx
回答by ansev
We can use greatest
我们可以用 greatest
Creating DataFrame
创建数据帧
df = spark.createDataFrame(
[[1,2,3], [2,1,2], [3,4,5]],
['col_1','col_2','col_3']
)
df.show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| 1| 2| 3|
| 2| 1| 2|
| 3| 4| 5|
+-----+-----+-----+
Solution
解决方案
from pyspark.sql.functions import greatest
df2 = df.withColumn('max_by_rows', greatest('col_1','col_2','col_3'))
#Only if you need col
#from pyspark.sql.functions import col
#df2 = df.withColumn('max',greatest(col('col_1'),col('col_2'),col('col_3')))
df2.show()
+-----+-----+-----+-----------+
|col_1|col_2|col_3|max_by_rows|
+-----+-----+-----+-----------+
| 1| 2| 3| 3|
| 2| 1| 2| 2|
| 3| 4| 5| 5|
+-----+-----+-----+-----------+
回答by Hareesh Adukkadukkam
Scala solution:
斯卡拉解决方案:
df = sc.parallelize(Seq((10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4))).toDF("c1", "c2", "c3"))
df.rdd.map(row=>List[String](row(0).toString,row(1).toString,row(2).toString)).map(x=>(x(0),x(1),x(2),x.min)).toDF("c1","c2","c3","min").show
+---+---+---+---+
| c1| c2| c3|min|
+---+---+---+---+
| 10| 10| 1| 1|
|200| 2| 20| 2|
| 3| 30|300| 3|
|400| 40| 4| 4|
+---+---+---+---+
回答by Rags
Another simple way of doing it. Let us say that the below df
is your dataframe
另一种简单的方法。让我们说下面df
是您的数据框
df = sc.parallelize([(10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4)]).toDF(["c1", "c2", "c3"])
df.show()
+---+---+---+
| c1| c2| c3|
+---+---+---+
| 10| 10| 1|
|200| 2| 20|
| 3| 30|300|
|400| 40| 4|
+---+---+---+
You can process the above df as below to get the desited results
您可以按如下方式处理上述 df 以获得所需的结果
from pyspark.sql.functions import lit, min
df.select( lit('c1').alias('cn1'), min(df.c1).alias('c1'),
lit('c2').alias('cn2'), min(df.c2).alias('c2'),
lit('c3').alias('cn3'), min(df.c3).alias('c3')
)\
.rdd.flatMap(lambda r: [ (r.cn1, r.c1), (r.cn2, r.c2), (r.cn3, r.c3)])\
.toDF(['Columnn', 'Min']).show()
+-------+---+
|Columnn|Min|
+-------+---+
| c1| 3|
| c2| 2|
| c3| 1|
+-------+---+