scala 如何在 Spark 中找到分组数据的确切中位数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/41431270/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to find exact median for grouped data in Spark
提问by Prabu Soundar Rajan
I have a requirement to calculate exact median on grouped data set of Double datatype in Spark using Scala.
我需要使用 Scala 计算 Spark 中 Double 数据类型分组数据集的精确中位数。
It is different from the similar query: Find median in spark SQL for multiple double datatype columns. This question is about the finding data for grouped data, whereas the other one is about finding median on a RDD level.
它与类似的查询不同:在 spark SQL 中为多个双数据类型列查找中值。这个问题是关于分组数据的查找数据,而另一个是关于在 RDD 级别上查找中位数。
Here is my sample data
这是我的示例数据
scala> sqlContext.sql("select * from test").show()
+---+---+
| id|num|
+---+---+
|  A|0.0|
|  A|1.0|
|  A|1.0|
|  A|1.0|
|  A|0.0|
|  A|1.0|
|  B|0.0|
|  B|1.0|
|  B|1.0|
+---+---+
Expected Answer:
预期答案:
+--------+
| Median |
+--------+
|   1    |
|   1    |
+--------+
I tried the following option, but no luck:
我尝试了以下选项,但没有运气:
1) Hive function percentile, it worked only for BigInt.
1) Hive 函数百分位,仅适用于 BigInt。
2) Hive function percentile_approx, but it does not work as expected (returns 0.25 vs 1).
2) Hive 函数percentile_approx,但它没有按预期工作(返回 0.25 vs 1)。
scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show()
+----+
| _c0|
+----+
|0.25|
|0.25|
+----+
回答by Denny Lee
Simplest Approach (requires Spark 2.0.1+ and not exact median)
最简单的方法(需要 Spark 2.0.1+ 而不是精确的中位数)
As noted in the comments in reference to the first question Find median in Spark SQL for double datatype columns, we can use percentile_approxto calculate median for Spark 2.0.1+.  To apply this for grouped data in Apache Spark, the query would look like:
正如第一个问题在 Spark SQL 中为双数据类型列查找中值的注释中所指出的,我们可以使用percentile_approx来计算 Spark 2.0.1+ 的中值。要将其应用于 Apache Spark 中的分组数据,查询将如下所示:
val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num")
df.createOrReplaceTempView("df")
spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show()
with the output being:
输出为:
+---+------+
| id|median|
+---+------+
|  A|   1.0|
|  B|   1.0|
+---+------+
Saying this, this is an approximate value(as opposed to an exact median per the question).
这么说,这是一个近似值(而不是每个问题的确切中位数)。
Calculate exact median for grouped data
计算分组数据的准确中位数
There are multiple approaches so I'm sure others in SO can provide better or more efficient examples. But here's a code snippet calculate the median for grouped data in Spark (verified in Spark 1.6 and Spark 2.1):
有多种方法,所以我相信 SO 中的其他人可以提供更好或更有效的例子。但这里的代码片段计算 Spark 中分组数据的中值(在 Spark 1.6 和 Spark 2.1 中验证):
import org.apache.spark.SparkContext._
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)))
// Scala median function
def median(inputList: List[Double]): Double = {
  val count = inputList.size
  if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (inputList(l) + inputList(r)).toDouble / 2
  } else
    inputList(count / 2).toDouble
}
// Sort the values
val setRDD = rdd.groupByKey()
val sortedListRDD = setRDD.mapValues(_.toList.sorted)
// Output DataFrame of id and median
sortedListRDD.map(m => {
  (m._1, median(m._2))
}).toDF("id", "median_of_num").show()
with the output being:
输出为:
+---+-------------+
| id|median_of_num|
+---+-------------+
|  A|          1.0|
|  B|          1.0|
+---+-------------+
There are some caveats that I should call out as this likely isn't the most efficient implementation:
我应该指出一些警告,因为这可能不是最有效的实现:
- It's currently using a 
groupByKeywhich is not very performant. You may want to change this into areduceByKeyinstead (more information at Avoid GroupByKey) - Using a Scala function to calculate the 
median. 
- 它目前使用的
groupByKey性能不是很好。您可能希望将其改为 areduceByKey(在避免 GroupByKey 中获得更多信息) - 使用 Scala 函数计算
median. 
This approach should work okay for smaller amounts of data but if you have millions of rows for each key, would advise utilizing Spark 2.0.1+ and using the percentile_approxapproach.
这种方法对于少量数据应该可以正常工作,但如果每个键有数百万行,建议使用 Spark 2.0.1+ 并使用该percentile_approx方法。
回答by Prabu Soundar Rajan
Here is my version of PERCENTILE_COUNT function in SPARK. This can be used to find the value of median for a grouped data in Dataframe. Hope it may help someone. Feel free to provide your suggestions to improve the solution.
这是我在 SPARK 中的 PERCENTILE_COUNT 函数版本。这可用于在 Dataframe 中查找分组数据的中值。希望它可以帮助某人。随时提供您的建议以改进解决方案。
val PERCENTILEFLOOR = udf((maxrank: Integer, percentile: Double) => scala.math.floor(1 + (percentile * (maxrank - 1))))
  val PERCENTILECEIL = udf((maxrank: Integer, percentile: Double) => scala.math.ceil(1 + (percentile * (maxrank - 1))))
  val PERCENTILECALC = udf((maxrank: Integer, percentile: Double, floorVal: Double, ceilVal: Double, floorNum: Double, ceilNum: Double)
=> {
    if (ceilNum == floorNum) {
      floorVal
    } else {
      val RN = (1 + (percentile * (maxrank - 1)))
      ((ceilNum - RN) * floorVal) + ((RN - floorNum) * ceilVal)
    }   })
/**    * The result of PERCENTILE_CONT is computed by linear interpolation between values after ordering them.    * Using the percentile value (P) and the number of rows (N) in the aggregation group,    * we compute the row number we are interested in after ordering the rows with respect to the sort specification.    * This row number (RN) is computed according to the formula RN = (1+ (P*(N-1)).    * The final result of the aggregate function is computed by linear interpolation between the values from rows at row numbers   
* CRN = CEILING(RN) and FRN = FLOOR(RN).    *    * The final result will be:    *    * If (CRN = FRN = RN) then the result is    * (value of expression from row at RN)    * Otherwise the result is    * (CRN - RN) * (value of expression for row at FRN) +    * (RN - FRN) * (value of expression for row at CRN)    *    * Parameter details    *    * @inputDF - Dataframe for computation    * @medianCol - Column for which percentile to be calculated    * @grouplist - Group list for dataframe before sorting    * @percentile - numeric value between 0 and 1 to express the percentile to be calculated    *    */
  def percentile_count(inputDF: DataFrame, medianCol: String, groupList: List[String], percentile: Double): DataFrame = {
    val orderList = List(medianCol)
    val wSpec3 = Window.partitionBy(groupList.head, groupList.tail: _*).orderBy(orderList.head, orderList.tail: _*)
    //   Group, sort and rank the DF
    val rankedDF = inputDF.withColumn("rank", row_number().over(wSpec3))
    // Find the maximum for each group 
    val groupedMaxDF = rankedDF.groupBy(groupList.head, groupList.tail: _*).agg(max("rank").as("maxval"))
    // CRN calculation
    val ceilNumDF = groupedMaxDF.withColumn("rank", PERCENTILECEIL(groupedMaxDF("maxval"), lit(percentile))).drop("maxval")
    // FRN calculation
    val floorNumDF = groupedMaxDF.withColumn("rank", PERCENTILEFLOOR(groupedMaxDF("maxval"), lit(percentile)))
    val ntileGroup = "rank" :: groupList
    //Get the values for the CRN and FRN 
    val floorDF = floorNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "floorNum").withColumnRenamed(medianCol, "floorVal")
    val ceilDF = ceilNumDF.join(rankedDF, ntileGroup).withColumnRenamed("rank", "ceilNum").withColumnRenamed(medianCol, "ceilVal")
    //Get both the values for CRN and FRN in same row
    val resultDF = floorDF.join(ceilDF, groupList)
    val finalList = "median_" + medianCol :: groupList
    // Calculate the median using the UDF PERCENTILECALC and returns the DF
    resultDF.withColumn("median_" + medianCol, PERCENTILECALC(resultDF("maxval"), lit(percentile), resultDF("floorVal"), resultDF("ceilVal"), resultDF("floorNum"), resultDF("ceilNum"))).select(finalList.head, finalList.tail: _*)
  }
回答by Daniar Achakeev
You can try this solution for exact median. I described spark sql solution here gist.github. To compute exact median I use row_number() and count() functions in conjuction with a window function.
您可以尝试使用此解决方案以获得精确的中位数。我在gist.github 中描述了 spark sql 解决方案。为了计算精确的中位数,我将 row_number() 和 count() 函数与窗口函数结合使用。
val data1 = Array( ("a", 0), ("a", 1), ("a", 1), ("a", 1), ("a", 0), ("a", 1))
val data2 = Array( ("b", 0), ("b", 1), ("b", 1))
val union = data1.union(data2)
val df = sc.parallelize(union).toDF("key", "val")
df.cache.createOrReplaceTempView("kvTable")
spark.sql("SET spark.sql.shuffle.partitions=2")
var ds = spark.sql("""
   SELECT key, avg(val) as median
FROM ( SELECT key, val, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2 
        FROM ( 
            SELECT key, val, row_number() OVER (PARTITION BY key ORDER BY val ) as rN, count(val) OVER (PARTITION BY key ) as cN
            FROM kvTable
         ) s
    ) r
WHERE rN BETWEEN m1 and m2
GROUP BY key 
""")
Spark executes and optimizes this query efficiently, since it reuses data partitioning.
Spark 可以高效地执行和优化此查询,因为它重用了数据分区。
scala> ds.show
+---+------+
|key|median|
+---+------+
|  a|   1.0|
|  b|   1.0|
+---+------+
回答by Rockie Yang
with high order function element_atadded in Spark 2.4. We could use with Window function, or groupBy then join back.
element_at在 Spark 2.4 中添加了高阶函数。我们可以使用 Window 函数,或者 groupBy 然后加入。
Sample Data
样本数据
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
case class Salary(depName: String, empNo: Long, salary: Long)
val empsalary = Seq(
  Salary("sales", 1, 5000),
  Salary("personnel", 2, 3900),
  Salary("sales", 3, 4800),
  Salary("sales", 4, 4800),
  Salary("personnel", 5, 3500),
  Salary("develop", 7, 4200),
  Salary("develop", 8, 6000),
  Salary("develop", 9, 4500),
  Salary("develop", 10, 5200),
  Salary("develop", 11, 5200)).toDS
with Window function
带窗口功能
val byDepName = Window.partitionBy('depName).orderBy('salary)
val df = empsalary.withColumn(
  "salaries", collect_list('salary) over byDepName).withColumn(
  "median_salary", element_at('salaries, (size('salaries)/2 + 1).cast("int")))
df.show(false)
with groupBy then join back
与 groupBy 然后加入
val dfMedian = empsalary.groupBy("depName").agg(
  sort_array(collect_list('salary)).as("salaries")).select(
  'depName, 
  element_at('salaries, (size('salaries)/2 + 1).cast("int")).as("median_salary"))
empsalary.join(dfMedian, "depName").show(false)
回答by Elior Malul
If you don't wanna use spark-sql (as I do) you can use the cume_distfunction.
如果您不想使用 spark-sql(就像我一样),您可以使用该cume_dist函数。
See example below:
请参阅下面的示例:
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
val df = (1 to 10).toSeq.toDF
val win = Window.
    partitionBy(F.col("value")).
    orderBy(F.col("value")).
    rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show
Results:
结果:
+-----+---+
|value|  c|
+-----+---+
|    1|0.1|
|    2|0.2|
|    3|0.3|
|    4|0.4|
|    5|0.5|
|    6|0.6|
|    7|0.7|
|    8|0.8|
|    9|0.9|
|   10|1.0|
+-----+---+
The median is the value for which df("c")equals 0.5.
I hope it helps, Elior.
中位数是df("c")等于 0.5的值。我希望它有所帮助,埃利奥。
回答by Shane318
Just to add to Elior's answer and responding to Erkan, the reason the output is 1.0 for each column is that the partitionBy(F.col("value")) partitions the data as a single row per partition such that when the window calculates cume_distit does it for a single value and results with 1.0. 
只是为了添加 Elior 的答案并响应 Erkan,每列输出为 1.0 的原因是 partitionBy(F.col("value")) 将数据分区为每个分区的单行,以便当窗口计算cume_dist它时为单个值执行此操作,结果为 1.0。
Removing the partitionBy(F.col("value")) from the window operation results in the expected quantiles.
从窗口操作中删除 partitionBy(F.col("value")) 会产生预期的分位数。
Start of Elior's Answer
Elior 的回答开头
If you don't wanna use spark-sql (as I do) you can use the cume_distfunction.
See example below:
如果您不想使用 spark-sql(就像我一样),您可以使用该cume_dist函数。请参阅下面的示例:
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
val df = (1 to 10).toSeq.toDF
val win = Window.
    partitionBy(F.col("value")).    //Remove this line
    orderBy(F.col("value")).
    rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show
Results:
结果:
+-----+---+
|value|  c|
+-----+---+
|    1|0.1|
|    2|0.2|
|    3|0.3|
|    4|0.4|
|    5|0.5|
|    6|0.6|
|    7|0.7|
|    8|0.8|
|    9|0.9|
|   10|1.0|
+-----+---+
The median is the value for which df("c")equals 0.5. I hope it helps, Elior.
中位数是df("c")等于 0.5的值。我希望它有所帮助,埃利奥。
End of Elior's Answer
Elior 的回答结束
Window defined without partitionBy:
没有 partitionBy 定义的窗口:
val win = Window.
    orderBy(F.col("value")).
    rangeBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("c", F.cume_dist().over(win)).show

