Java 聚合函数在 Spark 中使用 groupBy 计算使用情况

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41890485/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 23:45:42  来源:igfitidea点击:

aggregate function Count usage with groupBy in Spark

javascalaapache-sparkpysparkapache-spark-sql

提问by Adiel

I'm trying to make multiple operations in one line of code in pySpark, and not sure if that's possible for my case.

我试图在 pySpark 的一行代码中进行多项操作,但不确定这对我的情况是否可行。

My intention is not having to save the output as a new dataframe.

我的目的是不必将输出保存为新的数据帧。

My current code is rather simple:

我当前的代码相当简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

And my intention is to add count()after using groupBy, to get, well, the count of records matching each value of timePeriodcolumn, printed\shown as output.

我的目的是count()在使用groupBy,之后添加,以获取与timePeriod列的每个值匹配的记录数,打印\显示为输出。

When trying to use groupBy(..).count().agg(..)I get exceptions.

尝试使用时出现groupBy(..).count().agg(..)异常。

Is there any way to achieve both count()and agg().show() prints, without splitting code to two lines of commands, e.g. :

有什么方法可以同时实现count()agg().show().show() 打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

Or better yet, for getting a merged output to agg.show()output - An extra column which states the counted number of records matching the row's value. e.g.:

或者更好的是,为了将合并的输出输出到agg.show()输出 - 一个额外的列,它说明与行值匹配的记录计数。例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315

采纳答案by mrsrinivas

count()can be used inside agg()as groupByexpression is same.

count()可以在内部使用,agg()因为groupBy表达式相同。

With Python

使用 Python

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL functions doc

pySpark SQL 函数文档

With Scala

使用 Scala

import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1)will count the records by first column which is equal to count("timePeriod")

count(1)将按等于的第一列计算记录 count("timePeriod")

With Java

使用 Java

import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)