scala Spark:使用Scala在reduceByKey中取平均值而不是总和
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40087483/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark : Average of values instead of sum in reduceByKey using Scala
提问by finman
When reduceByKey is called it sums all values with same key. Is there any way to calculate the average of values for each key ?
当调用 reduceByKey 时,它会将所有值与相同的键相加。有没有办法计算每个键的平均值?
// I calculate the sum like this and don't know how to calculate the avg
reduceByKey((x,y)=>(x+y)).collect
Array(((Type1,1),4.0), ((Type1,1),9.2), ((Type1,2),8), ((Type1,2),4.5), ((Type1,3),3.5),
((Type1,3),5.0), ((Type2,1),4.6), ((Type2,1),4), ((Type2,1),10), ((Type2,1),4.3))
回答by sina
One way is to use mapValues and reduceByKey which is easier than aggregateByKey.
一种方法是使用mapValues 和reduceByKey,这比aggregateByKey 更容易。
.mapValues(value => (value, 1)) // map entry with a count of 1
.reduceByKey {
case ((sumL, countL), (sumR, countR)) =>
(sumL + sumR, countL + countR)
}
.mapValues {
case (sum , count) => sum / count
}
.collect
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
回答by kmh
there's lots of ways... but a simple way is to just use a class that keeps track of your total and count and computes average at the end. something like this would work.
有很多方法......但一个简单的方法是使用一个类来跟踪你的总数和计数并在最后计算平均值。这样的事情会起作用。
class AvgCollector(val tot: Double, val cnt: Int = 1) {
def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
def avg = tot / cnt
}
val rdd2 = {
rdd
.map{ case (k,v) => (k, new AvgCollector(v)) }
.reduceByKey(_ combine _)
.map{ case (k,v) => (k, v.avg) }
}
... or you could use aggregateByKey with a tweak to the class
...或者你可以使用aggregateByKey对类进行调整
class AvgCollector(val tot: Double, val cnt: Int = 1) {
def ++(v: Double) = new AvgCollector(tot + v, cnt + 1)
def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
def avg = if (cnt > 0) tot / cnt else 0.0
}
rdd2 = {
rdd
.aggregateByKey( new AvgCollector(0.0,0) )(_ ++ _, _ combine _ )
.map{ case (k,v) => (k, v.avg) }
}

