scala 在 spark 条件下计算数据帧的行数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32073695/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
counting rows of a dataframe with condition in spark
提问by user1735076
I am trying this:
我正在尝试这个:
df=dfFromJson:
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"law"}
{"class":"name 1","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
df.groupBy("class").agg(count(col("stream")==="science") as "stream_science", count(col("stream")==="arts") as "stream_arts", count(col("stream")==="law") as "stream_law")
This is not giving expected output, how can I achieve it in fastest way?
这没有给出预期的输出,我怎样才能以最快的方式实现它?
回答by zero323
It is not exactly clear what is the expected output but I guess you want something like this:
目前还不清楚预期的输出是什么,但我想你想要这样的东西:
import org.apache.spark.sql.functions.{count, col, when}
val streams = df.select($"stream").distinct.collect.map(_.getString(0))
val exprs = streams.map(s => count(when($"stream" === s, 1)).alias(s"stream_$s"))
df
.groupBy("class")
.agg(exprs.head, exprs.tail: _*)
// +------+--------------+----------+-----------+
// | class|stream_science|stream_law|stream_arts|
// +------+--------------+----------+-----------+
// |name 1| 2| 2| 1|
// |name 2| 2| 2| 2|
// +------+--------------+----------+-----------+
If you don't care about names and have only one group column you can simply use DataFrameStatFunctions.crosstab:
如果您不关心名称并且只有一个组列,则可以简单地使用DataFrameStatFunctions.crosstab:
df.stat.crosstab("class", "stream")
// +------------+---+----+-------+
// |class_stream|law|arts|science|
// +------------+---+----+-------+
// | name 1| 2| 1| 2|
// | name 2| 2| 2| 2|
// +------------+---+----+-------+
回答by quantum_random
You can just group by both the columns instead of grouping by a single column and then filtering. Because I am not fluent enough in Scala, below is the code snippet in Python. Note, I have changed your col names from "stream" and "class" to "dept" and "name" to avoid name conflicts with Spark's "stream" and "class" types.
您可以只按两列分组,而不是按单个列分组然后过滤。因为我对 Scala 不够流利,下面是 Python 中的代码片段。请注意,我已将您的 col 名称从“stream”和“class”更改为“dept”和“name”,以避免与 Spark 的“stream”和“class”类型发生名称冲突。
import pyspark.sql
from pyspark.sql import Row
hc = HiveContext(sc)
obj = [
{"class":"name 1","stream":"science"},
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"},
{"class":"name 1","stream":"law"},
{"class":"name 1","stream":"law"},
{"class":"name 2","stream":"science"},
{"class":"name 2","stream":"arts"},
{"class":"name 2","stream":"law"},
{"class":"name 2","stream":"science"},
{"class":"name 2","stream":"arts"},
{"class":"name 2","stream":"law"}
]
rdd = sc.parallelize(obj).map(labmda i: Row(dept=i['stream'], name=i['class']))
df = hc.createDataFrame(rdd)
df.groupby(df.dept, df.name).count().collect()
This results in the following output -
这导致以下输出 -
[
Row(dept='science', name='name 1', count=2),
Row(dept='science', name='name 2', count=2),
Row(dept='arts', name='name 1', count=1),
Row(dept='arts', name='name 2', count=2),
Row(dept='law', name='name 1', count=2),
Row(dept='law', name='name 2', count=2)
]

