scala Spark:按键获得前 N 个

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30164865/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:08:41  来源:igfitidea点击:

Spark: Get top N by key

scalaapache-spark

提问by michael_erasmus

Say I have a PairRDD as such (Obviously much more data in real life, assume millions of records):

假设我有一个 PairRDD(显然现实生活中的数据要多得多,假设有数百万条记录):

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2)
))

What is the most efficient way to generate a RDD with the top 2 scores per key?

生成每个键前 2 个分数的 RDD 的最有效方法是什么?

val top2ByKey = ...
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3))

采纳答案by abalcerek

I think this should be quite efficient:

我认为这应该非常有效:

Edited according to OP comments:

根据OP评论编辑:

scores.mapValues(p => (p, p)).reduceByKey((u, v) => {
  val values = List(u._1, u._2, v._1, v._2).sorted(Ordering[Int].reverse).distinct
  if (values.size > 1) (values(0), values(1))
  else (values(0), values(0))
}).collect().foreach(println)

回答by jbochi

Since version 1.4, there is a built-in way to do this using MLLib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala

从 1.4 版开始,有一种使用 MLLib 的内置方法:https: //github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd /MLPairRDDFunctions.scala

import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
scores.topByKey(2)

回答by Ramin Orujov

Slightly modified your input data.

稍微修改了您的输入数据。

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2),
      ("a", 6),
      ("b", 8)
    ))

I explain how to do it step by step:

我解释了如何一步一步地做到这一点:

1.Group by key to create array

1.按key分组创建数组

scores.groupByKey().foreach(println)  

Result:

结果:

(b,CompactBuffer(3, 1, 4, 2, 8))
(a,CompactBuffer(1, 2, 3, 4, 6))

As you see, each value itself is a array of numbers. CompactBuffer is just optimised array.

如您所见,每个值本身都是一个数字数组。CompactBuffer 只是优化的数组。

2.For each key, reverse sort list of numbers that value contains

2.对于每个key,将value包含的数字逆序排序

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)} ).foreach(println)

Result:

结果:

(b,List(8, 4, 3, 2, 1))
(a,List(6, 4, 3, 2, 1))

3.Keep only first 2 elements from the 2nd step, they will be top 2 scores in the list

3.只保留第 2 步的前 2 个元素,它们将是列表中的前 2 个分数

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).foreach(println)

Result:

结果:

(a,List(6, 4))
(b,List(8, 4))

4.Flat map to create new Paired RDD for each key and top score

4.Flat map为每个key和top score创建新的Paired RDD

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println)

Result:

结果:

(b,8)
(b,4)
(a,6)
(a,4)

5.Optional step - sort by key if you want

5.可选步骤 - 如果需要,可以按键排序

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println)

Result:

结果:

(a,6)
(a,4)
(b,8)
(b,4)

Hope, this explanation helped to understand the logic.

希望,这个解释有助于理解逻辑。

回答by Ning Guo

 scores.reduceByKey(_ + _).map(x => x._2 -> x._1).sortByKey(false).map(x => x._2 -> x._1).take(2).foreach(println)