如何获得 Spark RDD 的 SQL row_number 等价物?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27050247/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-01 02:58:23  来源:igfitidea点击:

How do I get a SQL row_number equivalent for a Spark RDD?

sqlapache-sparkrow-numberrdd

提问by Glenn Strycker

I need to generate a full list of row_numbers for a data table with many columns.

我需要为具有多列的数据表生成完整的 row_numbers 列表。

In SQL, this would look like this:

在 SQL 中,这看起来像这样:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like

现在,让我们在 Spark 中说我有一个形式为 (K, V) 的 RDD,其中 V=(col1, col2, col3),所以我的条目就像

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number

我想使用 sortBy()、sortWith()、sortByKey()、zipWithIndex 等命令对这些命令进行排序,并拥有一个具有正确 row_number 的新 RDD

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)

(我不关心括号,所以形式也可以是 (K, (col1,col2,col3,rownum)))

How do I do this?

我该怎么做呢?

Here's my first attempt:

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array

还要注意,函数 sortBy 不能直接应用于 RDD,但必须先运行 collect(),然后输出也不是 RDD,而是数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

Here's a little more progress, but still not partitioned:

这里有更多的进展,但仍然没有分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

回答by dnlbrky

The row_number() over (partition by ... order by ...)functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

row_number() over (partition by ... order by ...)功能已添加到 Spark 1.4。这个答案使用 PySpark/DataFrames。

Create a test DataFrame:

创建一个测试数据帧:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

Add the partitioned row number:

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+

回答by Guillaume

This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.

这是你提出的一个有趣的问题。我将用 Python 回答它,但我相信您将能够无缝地转换为 Scala。

Here is how I would tackle it:

这是我将如何解决它:

1- Simplify your data:

1- 简化您的数据:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 is now a "real" key-value pair. It looks like that:

temp2 现在是一个“真正的”键值对。它看起来像这样:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

]

]

2- Then, use the group-by function to reproduce the effect of the PARTITION BY:

2-然后,使用group-by功能重现PARTITION BY的效果:

temp3 = temp2.groupByKey()

temp3 is now a RDD with 2 rows:

temp3 现在是一个有 2 行的 RDD:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):

3- 现在,您需要为 RDD 的每个值应用排名函数。在 python 中,我将使用简单的排序函数(枚举将创建您的 row_number 列):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:

请注意,要实现您的特定订单,您需要提供正确的“key”参数(在 python 中,我只会创建一个像这样的 lambda 函数:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

At the end (without the key argument function, it looks like that):

最后(没有关键参数函数,它看起来像这样):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

]

]

Hope that helps!

希望有帮助!

Good luck.

祝你好运。

回答by Wallace Huang

val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))

test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))

测试:Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8) ,9)), (key2,(0,1,2)))

test.foreach(println)

(key1,(1,2,3))

(key1,(1,2,3))

(key1,(4,5,6))

(key1,(4,5,6))

(key2,(7,8,9))

(key2,(7,8,9))

(key2,(0,1,2))

(key2,(0,1,2))

val rdd = sc.parallelize(test, 2)

rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26

rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26

val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))

rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25

rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25

val rdd2 = rdd1.flatMap{ 
  elem =>
   val key = elem._1
   elem._2.map(row => (key, row._1, row._2))
 }

rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25

rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25

rdd2.collect.foreach(println)

(key1,(1,2,3),0)

(key1,(1,2,3),0)

(key1,(4,5,6),1)

(key1,(4,5,6),1)

(key2,(0,1,2),0)

(key2,(0,1,2),0)

(key2,(7,8,9),1)

(key2,(7,8,9),1)

回答by Dakshin Rajavel

From spark sql
Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");

从 spark sql
读取数据文件...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");

The above file has fields user_id, pageviews and clicks

上面的文件有字段 user_id、pageviews 和 clicks

Generate the activity Id (row_number) partitioned by user_id and order by clicks
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));

生成按 user_id 分区并按点击排序的活动 ID (row_number)
val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks"))。演员(DataTypes.IntegerType));