scala 在 spark 条件下计算数据帧的行数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32073695/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:28:27  来源:igfitidea点击:

counting rows of a dataframe with condition in spark

jsonscalaapache-sparkdataframeapache-spark-sql

提问by user1735076

I am trying this:

我正在尝试这个:

df=dfFromJson:
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"law"}
{"class":"name 1","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}


df.groupBy("class").agg(count(col("stream")==="science") as "stream_science", count(col("stream")==="arts") as "stream_arts", count(col("stream")==="law") as "stream_law")

This is not giving expected output, how can I achieve it in fastest way?

这没有给出预期的输出,我怎样才能以最快的方式实现它?

回答by zero323

It is not exactly clear what is the expected output but I guess you want something like this:

目前还不清楚预期的输出是什么,但我想你想要这样的东西:

import org.apache.spark.sql.functions.{count, col, when}

val streams = df.select($"stream").distinct.collect.map(_.getString(0))
val exprs = streams.map(s => count(when($"stream" === s, 1)).alias(s"stream_$s"))

df
  .groupBy("class")
  .agg(exprs.head, exprs.tail: _*)

// +------+--------------+----------+-----------+
// | class|stream_science|stream_law|stream_arts|
// +------+--------------+----------+-----------+
// |name 1|             2|         2|          1|
// |name 2|             2|         2|          2|
// +------+--------------+----------+-----------+

If you don't care about names and have only one group column you can simply use DataFrameStatFunctions.crosstab:

如果您不关心名称并且只有一个组列,则可以简单地使用DataFrameStatFunctions.crosstab

df.stat.crosstab("class", "stream")

// +------------+---+----+-------+
// |class_stream|law|arts|science|
// +------------+---+----+-------+
// |      name 1|  2|   1|      2|
// |      name 2|  2|   2|      2|
// +------------+---+----+-------+

回答by quantum_random

You can just group by both the columns instead of grouping by a single column and then filtering. Because I am not fluent enough in Scala, below is the code snippet in Python. Note, I have changed your col names from "stream" and "class" to "dept" and "name" to avoid name conflicts with Spark's "stream" and "class" types.

您可以只按两列分组,而不是按单个列分组然后过滤。因为我对 Scala 不够流利,下面是 Python 中的代码片段。请注意,我已将您的 col 名称从“stream”和“class”更改为“dept”和“name”,以避免与 Spark 的“stream”和“class”类型发生名称冲突。

import pyspark.sql
from pyspark.sql import Row

hc = HiveContext(sc)

obj = [
    {"class":"name 1","stream":"science"},
    {"class":"name 1","stream":"arts"}
    {"class":"name 1","stream":"science"},
    {"class":"name 1","stream":"law"},
    {"class":"name 1","stream":"law"},
    {"class":"name 2","stream":"science"},
    {"class":"name 2","stream":"arts"},
    {"class":"name 2","stream":"law"},
    {"class":"name 2","stream":"science"},
    {"class":"name 2","stream":"arts"},
    {"class":"name 2","stream":"law"}
]
rdd = sc.parallelize(obj).map(labmda i: Row(dept=i['stream'], name=i['class']))
df = hc.createDataFrame(rdd)
df.groupby(df.dept, df.name).count().collect()

This results in the following output -

这导致以下输出 -

[
    Row(dept='science', name='name 1', count=2), 
    Row(dept='science', name='name 2', count=2), 
    Row(dept='arts', name='name 1', count=1), 
    Row(dept='arts', name='name 2', count=2), 
    Row(dept='law', name='name 1', count=2), 
    Row(dept='law', name='name 2', count=2)
]