如何获得 Spark RDD 的 SQL row_number 等价物?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27050247/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I get a SQL row_number equivalent for a Spark RDD?
提问by Glenn Strycker
I need to generate a full list of row_numbers for a data table with many columns.
我需要为具有多列的数据表生成完整的 row_numbers 列表。
In SQL, this would look like this:
在 SQL 中,这看起来像这样:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like
现在,让我们在 Spark 中说我有一个形式为 (K, V) 的 RDD,其中 V=(col1, col2, col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number
我想使用 sortBy()、sortWith()、sortByKey()、zipWithIndex 等命令对这些命令进行排序,并拥有一个具有正确 row_number 的新 RDD
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)
(我不关心括号,所以形式也可以是 (K, (col1,col2,col3,rownum)))
How do I do this?
我该怎么做呢?
Here's my first attempt:
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array
还要注意,函数 sortBy 不能直接应用于 RDD,但必须先运行 collect(),然后输出也不是 RDD,而是数组
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Here's a little more progress, but still not partitioned:
这里有更多的进展,但仍然没有分区:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
回答by dnlbrky
The row_number() over (partition by ... order by ...)
functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
该row_number() over (partition by ... order by ...)
功能已添加到 Spark 1.4。这个答案使用 PySpark/DataFrames。
Create a test DataFrame:
创建一个测试数据帧:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
添加分区行号:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
回答by Guillaume
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
这是你提出的一个有趣的问题。我将用 Python 回答它,但我相信您将能够无缝地转换为 Scala。
Here is how I would tackle it:
这是我将如何解决它:
1- Simplify your data:
1- 简化您的数据:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
temp2 现在是一个“真正的”键值对。它看起来像这样:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
2-然后,使用group-by功能重现PARTITION BY的效果:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
temp3 现在是一个有 2 行的 RDD:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
3- 现在,您需要为 RDD 的每个值应用排名函数。在 python 中,我将使用简单的排序函数(枚举将创建您的 row_number 列):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
请注意,要实现您的特定订单,您需要提供正确的“key”参数(在 python 中,我只会创建一个像这样的 lambda 函数:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
最后(没有关键参数函数,它看起来像这样):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
]
Hope that helps!
希望有帮助!
Good luck.
祝你好运。
回答by Wallace Huang
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))
测试:Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8) ,9)), (key2,(0,1,2)))
test.foreach(println)
(key1,(1,2,3))
(key1,(1,2,3))
(key1,(4,5,6))
(key1,(4,5,6))
(key2,(7,8,9))
(key2,(7,8,9))
(key2,(0,1,2))
(key2,(0,1,2))
val rdd = sc.parallelize(test, 2)
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25
val rdd2 = rdd1.flatMap{
elem =>
val key = elem._1
elem._2.map(row => (key, row._1, row._2))
}
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25
rdd2.collect.foreach(println)
(key1,(1,2,3),0)
(key1,(1,2,3),0)
(key1,(4,5,6),1)
(key1,(4,5,6),1)
(key2,(0,1,2),0)
(key2,(0,1,2),0)
(key2,(7,8,9),1)
(key2,(7,8,9),1)
回答by Dakshin Rajavel
From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
从 spark sql
读取数据文件...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");
The above file has fields user_id, pageviews and clicks
上面的文件有字段 user_id、pageviews 和 clicks
Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));
生成按 user_id 分区并按点击排序的活动 ID (row_number)
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks"))。演员(DataTypes.IntegerType));