在 Apache Spark (Scala) 中使用 reduceByKey
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24071560/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Using reduceByKey in Apache Spark (Scala)
提问by blue-sky
I have a list of Tuples of type : (user id, name, count).
我有一个类型为 :(user id, name, count) 的元组列表。
For example,
例如,
val x = sc.parallelize(List(
("a", "b", 1),
("a", "b", 1),
("c", "b", 1),
("a", "d", 1))
)
I'm attempting to reduce this collection to a type where each element name is counted.
我正在尝试将此集合减少为计算每个元素名称的类型。
So in above val x is converted to :
所以在上面的 val x 被转换为:
(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))
Here is the code I am currently using :
这是我目前使用的代码:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey
grouped2.foreach(println)
I'm attempting to use reduceByKey as it performs faster than groupByKey.
我正在尝试使用 reduceByKey,因为它的执行速度比 groupByKey 快。
How can reduceByKey be implemented instead of above code to provide the same mapping ?
如何实现 reduceByKey 而不是上面的代码来提供相同的映射?
回答by maasg
Following your code:
按照您的代码:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
You could do:
你可以这样做:
val reducedByKey = byKey.reduceByKey(_ + _)
scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)
PairRDDFunctions[K,V].reduceByKeytakes an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V. In this particular case with sum on Ints: (x:Int, y:Int) => x+yor _ + _in short underscore notation.
PairRDDFunctions[K,V].reduceByKey采用可应用于 RDD[(K,V)] 的类型 V 的关联归约函数。换句话说,你需要一个函数f[V](e1:V, e2:V) : V。在这种特殊情况下,总和在 Ints: 上(x:Int, y:Int) => x+y或_ + _简而言之下划线表示法。
For the record: reduceByKeyperforms better than groupByKeybecause it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKeywill force a shuffle of all elements before grouping.
记录:reduceByKey比groupByKey因为它尝试在 shuffle/reduce 阶段之前在本地应用 reduce 函数而表现更好。groupByKey将在分组之前强制对所有元素进行洗牌。
回答by cloud
Your origin data structure is: RDD[(String, String, Int)], and reduceByKeycan only be used if data structure is RDD[(K, V)].
你的原始数据结构是:RDD[(String, String, Int)],reduceByKey只有在数据结构是RDD[(K, V)]时才能使用。
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)
回答by napster
The syntax is below:
语法如下:
reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
which says for the same key in an RDD it takes the values (which will be definitely of same type) performs the operation provided as part of function and returns the value of same type as of parent RDD.
这表示对于 RDD 中的相同键,它采用值(肯定是相同类型的)执行作为函数一部分提供的操作,并返回与父 RDD 相同类型的值。

