scala 计算一行的排名

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33729787/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:48:12  来源:igfitidea点击:

Computing rank of a row

scalaapache-sparkdataframehiveapache-spark-sql

提问by shailesh gupta

I want to rank user id based on one field. For the same value of the field, rank should be same. That data is in Hive table.

我想根据一个字段对用户 ID 进行排名。对于相同的字段值,排名应该相同。该数据在 Hive 表中。

e.g.

例如

user value
a       5
b       10
c       5
d       6

Rank
a - 1
c - 1
d - 3
b - 4

How can i do that?

我怎样才能做到这一点?

回答by zero323

It is possible to use rankwindow function either with a DataFrame API:

可以将rank窗口函数与 DataFrame API 一起使用:

import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"value")

val df = sc.parallelize(Seq(
  ("a", 5), ("b", 10), ("c", 5), ("d", 6)
)).toDF("user", "value")

df.select($"user", rank.over(w).alias("rank")).show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

or raw SQL:

或原始 SQL:

df.registerTempTable("df")
sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

but it is extremely inefficient.

但效率极低。

You can also try to use RDD API but it is not exactly straightforward. First lets convert DataFrame to RDD:

您也可以尝试使用 RDD API,但这并不完全简单。首先让我们将 DataFrame 转换为 RDD:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.RangePartitioner

val rdd: RDD[(Int, String)] = df.select($"value", $"user")
  .map{ case Row(value: Int, user: String) => (value, user) }

val partitioner = new RangePartitioner(rdd.partitions.size,  rdd)
val sorted =  rdd.repartitionAndSortWithinPartitions(partitioner)

Next we have to compute ranks per partition:

接下来我们必须计算每个分区的排名:

def rank(iter: Iterator[(Int,String)]) =  {
  val zero = List((-1L, Integer.MIN_VALUE, "", 1L))

  def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) = 
    (acc.head, x) match {
      case (
        (prevRank: Long, prevValue: Int, _, offset: Long),
        (currValue: Int, label: String)) => {
      val newRank = if (prevValue == currValue) prevRank else prevRank + offset
      val newOffset = if (prevValue == currValue) offset + 1L else 1L
      (newRank, currValue, label, newOffset) :: acc
    }
  }

  iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) =>
    (rank, label)}.toIterator
}


val partRanks = sorted.mapPartitions(rank)

offset for each partition

每个分区的偏移量

def getOffsets(sorted: RDD[(Int, String)]) = sorted
  .mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) => 
    Iterator((i, iter.size)))
  .collect
  .foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) => 
    (x._1, x._2 + acc.head._2) :: acc)
  .toMap

val offsets = sc.broadcast(getOffsets(sorted))

and the final ranks:

和最终排名:

def adjust(i: Int, iter: Iterator[(Long, String)]) = 
  iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)}

val ranks = partRanks
  .mapPartitionsWithIndex(adjust)
  .map{case (i, label) => (1 + i , label)}