scala 计算一行的排名
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33729787/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Computing rank of a row
提问by shailesh gupta
I want to rank user id based on one field. For the same value of the field, rank should be same. That data is in Hive table.
我想根据一个字段对用户 ID 进行排名。对于相同的字段值,排名应该相同。该数据在 Hive 表中。
e.g.
例如
user value
a 5
b 10
c 5
d 6
Rank
a - 1
c - 1
d - 3
b - 4
How can i do that?
我怎样才能做到这一点?
回答by zero323
It is possible to use rankwindow function either with a DataFrame API:
可以将rank窗口函数与 DataFrame API 一起使用:
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
val w = Window.orderBy($"value")
val df = sc.parallelize(Seq(
("a", 5), ("b", 10), ("c", 5), ("d", 6)
)).toDF("user", "value")
df.select($"user", rank.over(w).alias("rank")).show
// +----+----+
// |user|rank|
// +----+----+
// | a| 1|
// | c| 1|
// | d| 3|
// | b| 4|
// +----+----+
or raw SQL:
或原始 SQL:
df.registerTempTable("df")
sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show
// +----+----+
// |user|rank|
// +----+----+
// | a| 1|
// | c| 1|
// | d| 3|
// | b| 4|
// +----+----+
but it is extremely inefficient.
但效率极低。
You can also try to use RDD API but it is not exactly straightforward. First lets convert DataFrame to RDD:
您也可以尝试使用 RDD API,但这并不完全简单。首先让我们将 DataFrame 转换为 RDD:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.RangePartitioner
val rdd: RDD[(Int, String)] = df.select($"value", $"user")
.map{ case Row(value: Int, user: String) => (value, user) }
val partitioner = new RangePartitioner(rdd.partitions.size, rdd)
val sorted = rdd.repartitionAndSortWithinPartitions(partitioner)
Next we have to compute ranks per partition:
接下来我们必须计算每个分区的排名:
def rank(iter: Iterator[(Int,String)]) = {
val zero = List((-1L, Integer.MIN_VALUE, "", 1L))
def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) =
(acc.head, x) match {
case (
(prevRank: Long, prevValue: Int, _, offset: Long),
(currValue: Int, label: String)) => {
val newRank = if (prevValue == currValue) prevRank else prevRank + offset
val newOffset = if (prevValue == currValue) offset + 1L else 1L
(newRank, currValue, label, newOffset) :: acc
}
}
iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) =>
(rank, label)}.toIterator
}
val partRanks = sorted.mapPartitions(rank)
offset for each partition
每个分区的偏移量
def getOffsets(sorted: RDD[(Int, String)]) = sorted
.mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) =>
Iterator((i, iter.size)))
.collect
.foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) =>
(x._1, x._2 + acc.head._2) :: acc)
.toMap
val offsets = sc.broadcast(getOffsets(sorted))
and the final ranks:
和最终排名:
def adjust(i: Int, iter: Iterator[(Long, String)]) =
iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)}
val ranks = partRanks
.mapPartitionsWithIndex(adjust)
.map{case (i, label) => (1 + i , label)}

![scala 将 RDD[org.apache.spark.sql.Row] 转换为 RDD[org.apache.spark.mllib.linalg.Vector]](/res/img/loading.gif)