scala Spark 和 SparkSQL:如何模仿窗口函数?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32407455/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark and SparkSQL: How to imitate window function?
提问by Martin Senne
Description
描述
Given a dataframe df
给定一个数据框 df
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
I want to create a running counter or index,
我想创建一个运行计数器或索引,
- grouped by the same id and
- sorted by date in that group,
- 按相同的 id 分组,并且
- 按该组中的日期排序,
thus
因此
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
This is something I can achieve with window function, e.g.
这是我可以用窗口函数实现的东西,例如
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:
不幸的是,Spark 1.4.1 不支持常规数据帧的窗口函数:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
Questions
问题
- How can I achieve the above computation on current Spark 1.4.1 without using window functions?
- When will window functions for regular dataframes be supported in Spark?
- 如何在不使用窗口函数的情况下在当前 Spark 1.4.1 上实现上述计算?
- Spark 何时支持常规数据帧的窗口函数?
Thanks!
谢谢!
采纳答案by Kirk Broadhurst
You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.
你可以用 RDD 来做到这一点。就我个人而言,我发现 RDD 的 API 更有意义 - 我并不总是希望我的数据像数据框一样“扁平”。
val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()
The above gives a result like the following:
以上给出了如下结果:
Array[List[org.apache.spark.sql.Row]] =
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))
If you want the position within the 'group' as well, you can use zipWithIndex.
如果您还想要“组”中的位置,则可以使用zipWithIndex.
df.rdd.groupBy(r => r(0)).map(g =>
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))
You couldflatten this back to a simple List/Array of Rowobjects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.
您可以Row使用 FlatMap将其展平为一个简单的对象列表/数组,但是如果您需要在“组”上执行任何操作,那将不是一个好主意。
The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.
像这样使用 RDD 的缺点是从 DataFrame 转换到 RDD 再转换回来很乏味。
回答by zero323
You can use HiveContextfor local DataFramesas well and, unless you have a very good reason not to, it is probably a good idea anyway. It is a default SQLContextavailable in spark-shelland pysparkshell (as for now sparkRseems to use plain SQLContext) and its parser is recommended by Spark SQL and DataFrame Guide.
您也可以将其HiveContext用于本地DataFrames,除非您有充分的理由不这样做,否则无论如何这可能是一个好主意。它是和shell 中SQLContext可用的默认值(目前似乎使用 plain ),Spark SQL 和 DataFrame Guide推荐使用它的解析器。spark-shellpysparksparkRSQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
object HiveContextTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Hive Context")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(
("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
).toDF("k", "v")
val w = Window.partitionBy($"k").orderBy($"v")
df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
}
}
回答by Sayon M
I totally agree that Window functions for DataFrames are the way to go if you have Spark version (>=)1.5. But if you are really stuck with an older version(e.g 1.4.1), here is a hacky way to solve this
我完全同意如果你有 Spark 版本 (>=)1.5,DataFrames 的窗口函数是要走的路。但是如果你真的坚持使用旧版本(例如 1.4.1),这里有一个解决这个问题的hacky方法
val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
.toDF("id", "date")
val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
.where($"date"<=$"dateDup")
.groupBy($"id", $"date")
.agg($"id", $"date", count($"idDup").as("counter"))
.select($"id",$"date",$"counter")
Now if you do dfWithCounter.show
现在如果你这样做 dfWithCounter.show
You will get:
你会得到:
+---+----------+-------+
| id| date|counter|
+---+----------+-------+
| 1|2015-09-01| 1|
| 1|2015-09-04| 3|
| 1|2015-09-03| 2|
| 2|2015-09-01| 1|
| 2|2015-09-04| 2|
+---+----------+-------+
Note that dateis not sorted, but the counteris correct. Also you can change the ordering of the counterby changing the <=to >=in the wherestatement.
请注意,这date不是排序的,而是counter正确的。你也可以改变的顺序counter被改变<=,以>=在where声明。

