Python 在 PySpark 数据框中添加列总和作为新列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31955309/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:47:54  来源:igfitidea点击:

Add column sum as new column in PySpark dataframe

pythonapache-sparkpysparkspark-dataframe

提问by plam

I'm using PySpark and I have a Spark dataframe with a bunch of numeric columns. I want to add a column that is the sum of all the other columns.

我正在使用 PySpark,并且我有一个包含一堆数字列的 Spark 数据框。我想添加一个列,它是所有其他列的总和。

Suppose my dataframe had columns "a", "b", and "c". I know I can do this:

假设我的数据框有“a”、“b”和“c”列。我知道我可以这样做:

df.withColumn('total_col', df.a + df.b + df.c)

The problem is that I don't want to type out each column individually and add them, especially if I have a lot of columns. I want to be able to do this automatically or by specifying a list of column names that I want to add. Is there another way to do this?

问题是我不想单独输入每一列并添加它们,特别是如果我有很多列。我希望能够自动执行此操作或通过指定要添加的列名列表来执行此操作。有没有另一种方法可以做到这一点?

采纳答案by Paul

This was not obvious. I see no row-based sum of the columns defined in the spark Dataframes API.

这并不明显。我没有看到 spark Dataframes API 中定义的列的基于行的总和。

Version 2

版本 2

This can be done in a fairly simple way:

这可以通过一种相当简单的方式完成:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columnsis supplied by pyspark as a list of strings giving all of the column names in the Spark Dataframe. For a different sum, you can supply any other list of column names instead.

df.columns由 pyspark 作为字符串列表提供,给出 Spark Dataframe 中的所有列名。对于不同的总和,您可以提供任何其他列名称列表。

I did not try this as my first solution because I wasn't certain how it would behave. But it works.

我没有尝试将此作为我的第一个解决方案,因为我不确定它会如何表现。但它有效。

Version 1

版本 1

This is overly complicated, but works as well.

这过于复杂,但也有效。

You can do this:

你可以这样做:

  1. use df.columnsto get a list of the names of the columns
  2. use that names list to make a list of the columns
  3. pass that list to something that will invoke the column's overloaded add function in a fold-type functional manner
  1. 用于df.columns获取列名称的列表
  2. 使用该名称列表来制作列列表
  3. 将该列表传递给将以折叠类型功能方式调用列的重载添加函数的内容

With python's reduce, some knowledge of how operator overloading works, and the pyspark code for columns herethat becomes:

使用 python 的reduce,一些关于运算符重载如何工作的知识,以及此处列的 pyspark 代码变为:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

Note this is a python reduce, not a spark RDD reduce, and the parenthesis term in the second parameter to reduce requires the parenthesis because it is a list generator expression.

注意这是一个python reduce,而不是spark RDD reduce,reduce的第二个参数中的括号项需要括号,因为它是一个列表生成器表达式。

Tested, Works!

经测试,有效!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

回答by Grant Shannon

My problem was similar to the above (bit more complex) as i had to add consecutivecolumn sums as new columns in PySpark dataframe. This approach uses code from Paul's Version 1 above:

我的问题与上述类似(稍微复杂一点),因为我必须在 PySpark 数据帧中添加连续的列总和作为新列。这种方法使用了上面 Paul 版本 1 中的代码:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

add new columns that are cumulative sums (consecutive):

添加累积总和(连续)的新列:

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show()

df.show()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

'cumulative sum' columns added are as follows:

添加的“累计和”列如下:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3

回答by Francesco Boi

The solution

解决方案

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

posted by @Paul works. Nevertheless I was getting the error, as many other as I have seen,

由@Paul 作品发布。尽管如此,我还是收到了错误,正如我所看到的一样,

TypeError: 'Column' object is not callable

After some time I found the problem (at least in my case). The problem is that I previously imported some pyspark functions with the line

一段时间后,我发现了问题(至少在我的情况下)。问题是我之前用这条线导入了一些pyspark函数

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

so the line imported the sumpyspark command while df.withColumn('total', sum(df[col] for col in df.columns))is supposed to use the normal python sumfunction.

所以该行导入了sumpyspark 命令,而df.withColumn('total', sum(df[col] for col in df.columns))应该使用普通的 pythonsum函数。

You can delete the reference of the pyspark function with del sum.

您可以删除 pyspark 函数的引用del sum

Otherwise in my case I changed the import to

否则在我的情况下,我将导入更改为

import pyspark.sql.functions as F

and then referenced the functions as F.sum.

然后将函数引用为F.sum.

回答by Jonathan

The most straight forward way of doing it is to use the exprfunction

最直接的方法是使用expr函数

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))

回答by Jones Cavalcanti Sarmento

df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))

df.show()

+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
|  linha1|  valor1|       2|
|  linha2|  valor2|       5|
+--------+--------+--------+

df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()

+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
|  linha1|  valor1|       2|           1.0|
|  linha2|  valor2|       5|           2.5|
+--------+--------+--------+--------------+

df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()

+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
|  linha1|  valor1|       2|           1.0|         3.0|
|  linha2|  valor2|       5|           2.5|         7.5|
+--------+--------+--------+--------------+------------+

回答by Sonam Rao

A very simple approach would be to just use select instead of withcolumn as below:

一个非常简单的方法是使用 select 而不是 withcolumn 如下:

df = df.select('*', (col("a")+col("b")+col('c).alias("total"))

df = df.select('*', (col("a")+col("b")+col('c).alias("total"))

This should give you required sum with minor changes based on requirements

这应该为您提供所需的金额,并根据要求进行细微的更改

回答by Vivek Payasi

Summing multiple columns from a list into one column

将列表中的多列汇总为一列

PySpark's sumfunction doesn't support column addition. This can be achieved using exprfunction.

PySpark 的sum函数不支持列添加。这可以使用expr函数来实现。

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))

This gives us the desired sum of columns.

这为我们提供了所需的列总和。