Python 如何计算给定 PySpark DataFrame 的均值和标准差?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47995188/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:28:14  来源:igfitidea点击:

How to calculate mean and standard deviation given a PySpark DataFrame?

pythonapache-sparkpysparkapache-spark-sql

提问by Markus

I have PySpark DataFrame (not pandas) called dfthat is quite large to use collect(). Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.

我有 PySpark DataFrame(不是Pandas )df,它被称为非常大的使用collect()。因此,下面给出的代码效率不高。它正在处理少量数据,但现在它失败了。

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

Is there any way to get meanand stdas two variables by using pyspark.sql.functionsor similar?

有没有办法通过使用或类似的方式将meanstd作为两个变量pyspark.sql.functions

from pyspark.sql.functions import mean as mean_, std as std_

I could use withColumn, however, this approach applies the calculations row by row, and it does not return a single variable.

withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量。

UPDATE:

更新:

Sample content of df:

样本内容df

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

I should calculate mean and standard deviation of scorevalues, e.g. the value 1in [691,1]is one of scores.

我应计算的平均值和标准偏差score值,例如值1[691,1]的分数之一。

回答by pault

You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.

您可以使用内置函数来获取聚合统计信息。这是获得均值和标准差的方法。

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

Note that there are three different standard deviation functions. From the docs the one I used (stddev) returns the following:

请注意,存在三种不同的标准偏差函数。从文档中,我使用的 ( stddev) 返回以下内容:

Aggregate function: returns the unbiased sample standard deviation of the expression in a group

聚合函数:返回一组中表达式的无偏样本标准差

You could use the describe()method as well:

您也可以使用该describe()方法:

df.describe().show()

Refer to this link for more info: pyspark.sql.functions

有关更多信息,请参阅此链接:pyspark.sql.functions

UPDATE: This is how you can work through the nested data.

更新:这是您处理嵌套数据的方式。

Use explodeto extract the values into separate rows, then call meanand stddevas shown above.

用于explode将值提取到单独的行中,然后调用mean和 ,stddev如上所示。

Here's a MWE:

这是一个 MWE:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

Which outputs:

哪些输出:

[2.3333333333333335, 1.505545305418162]

You can verify that these values are correct using numpy:

您可以使用以下方法验证这些值是否正确numpy

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

Explanation: Your "products"column is a listof lists. Calling explodewill make a new row for each element of the outer list. Then grab the "score"value from each of the exploded rows, which you have defined as the second element in a 2-element list. Finally, call the aggregate functions on this new column.

说明:您的"products"列是一个listlist秒。调用explode将为外部的每个元素创建一个新行list。然后"score"从每个分解的行中获取值,您已将其定义为 2-element 中的第二个元素list。最后,在这个新列上调用聚合函数。

回答by Mahdi

You can use meanand stddevfrom pyspark.sql.functions:

您可以使用meanstddev来自pyspark.sql.functions

import pyspark.sql.functions as F

df = spark.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

result_df = (
    df
    .withColumn(
        'val_list',
        F.array(df.products.getItem(0).getItem(1),df.products.getItem(1).getItem(1))
    )
    .select(F.explode('val_list').alias('val'))
    .select(F.mean('val').alias('mean'), F.stddev('val').alias('stddev'))
)

print(result_df.collect())

which outputs:

输出:

[Row(mean=2.3333333333333335, stddev=1.505545305418162)]

You can read more about pyspark.sql.functionshere.

您可以pyspark.sql.functions在此处阅读更多信息。

回答by BigData-Guru

For Standard Deviation, better way of writing is as below. We can use formatting (to 2 decimal) and using the column Alias name

对于标准偏差,更好的书写方式如下。我们可以使用格式(到 2 位小数)并使用列别名

data_agg=SparkSession.builder.appName('Sales_fun').getOrCreate()    
data=data_agg.read.csv('sales_info.csv',inferSchema=True, header=True)

from pyspark.sql.functions import *

*data.select((format_number(stddev('Sales'),2)).alias('Sales_Stdev')).show()*