scala reduceByKey:它在内部是如何工作的?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/30145329/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
reduceByKey: How does it work internally?
提问by user764186
I am new to Spark and Scala. I was confused about the way reduceByKey function works in Spark. Suppose we have the following code:
我是 Spark 和 Scala 的新手。我对 reduceByKey 函数在 Spark 中的工作方式感到困惑。假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
The map function is clear: s is the key and it points to the line from data.txtand 1 is the value.
map 函数很明确:s 是键,它指向 from 的行data.txt,1 是值。
However, I didn't get how the reduceByKey works internally? Does "a" points to the key? Alternatively, does "a" point to "s"? Then what does represent a + b? how are they filled?
但是,我不明白 reduceByKey 是如何在内部工作的?“a”是否指向键?或者,“a”是否指向“s”?那么a+b代表什么?它们是如何填充的?
回答by Justin Pihony
Let's break it down to discrete methods and types. That usually exposes the intricacies for new devs:
让我们将其分解为离散的方法和类型。这通常会暴露新开发人员的复杂性:
pairs.reduceByKey((a, b) => a + b)
becomes
变成
pairs.reduceByKey((a: Int, b: Int) => a + b)
and renaming the variables makes it a little more explicit
并重命名变量使其更加明确
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
So, we can now see that we are simply taking an accumulated value for the given keyand summing it with the next value of that key. NOW, let's break it further so we can understand the key part. So, let's visualize the method more like this:
所以,我们现在可以看到我们只是简单地获取给定键的累积值并将其与该键的下一个值相加。现在,让我们进一步分解它,以便我们了解关键部分。因此,让我们更像这样可视化该方法:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
So, you can see that the reduceByKeytakes the boilerplate of finding the key and tracking it so that you don't have to worry about managing that part.
因此,您可以看到reduce ByKey采用了查找密钥并跟踪它的样板,这样您就不必担心管理该部分。
Deeper, truer if you want
如果你愿意,可以更深入、更真实
All that being said, that is a simplified version of what happens as there are some optimizations that are done here. This operation is associative, so the spark engine will perform these reductions locally first (often termed map-side reduce) and then once again at the driver. This saves network traffic; instead of sending all the data and performing the operation, it can reduce it as small as it can and then send that reduction over the wire.
话虽如此,这是发生的事情的简化版本,因为这里进行了一些优化。此操作是关联的,因此 Spark 引擎将首先在本地执行这些缩减(通常称为 map-side 缩减),然后再在驱动程序上执行一次。这样可以节省网络流量;它不是发送所有数据并执行操作,而是尽可能减少它,然后通过网络发送减少的数据。
回答by maasg
One requirement for the reduceByKeyfunction is that is must be associative. To build some intuition on how reduceByKeyworks, let's first see how an associative associative function helps us in a parallel computation:
该reduceByKey函数的一个要求是必须是关联的。为了对reduceByKey工作原理建立一些直觉,让我们首先看看关联关联函数如何帮助我们进行并行计算:


As we can see, we can break an original collection in pieces and by applying the associative function, we can accumulate a total. The sequential case is trivial, we are used to it: 1+2+3+4+5+6+7+8+9+10.
正如我们所见,我们可以将原始集合分成几部分,通过应用关联函数,我们可以累积一个总数。连续的情况是微不足道的,我们已经习惯了:1+2+3+4+5+6+7+8+9+10。
Associativity lets us use that same function in sequence and in parallel. reduceByKeyuses that property to compute a result out of an RDD, which is a distributed collection consisting of partitions.
关联性让我们可以顺序和并行地使用相同的函数。reduceByKey使用该属性从 RDD 计算结果,RDD 是由分区组成的分布式集合。
Consider the following example:
考虑以下示例:
// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))
In spark, data is distributed into partitions. For the next illustration, (4) partitions are to the left, enclosed in thin lines. First, we apply the function locally to each partition, sequentially in the partition, but we run all 4 partitions in parallel. Then, the result of each local computation are aggregated by applying the same function againand finally come to a result.
在 spark 中,数据分布在分区中。对于下一个插图,(4) 个分区在左侧,用细线包围。首先,我们将函数本地应用到每个分区,在分区中按顺序进行,但我们并行运行所有 4 个分区。然后,通过再次应用相同的函数,将每次局部计算的结果聚合,最终得出结果。


reduceByKeyis an specialization of aggregateByKeyaggregateByKeytakes 2 functions: one that is applied to each partition (sequentially) and one that is applied among the results of each partition (in parallel). reduceByKeyuses the same associative function on both cases: to do a sequential computing on each partition and then combine those results in a final result as we have illustrated here.
reduceByKey是aggregateByKeyaggregateByKeytake 2 函数的特化:一个应用于每个分区(顺序)和一个应用于每个分区的结果(并行)。reduceByKey在这两种情况下使用相同的关联函数:对每个分区进行顺序计算,然后将这些结果组合成最终结果,如此处所示。
回答by Arne Claassen
In your example of
在你的例子中
val counts = pairs.reduceByKey((a,b) => a+b)
aand bare both Intaccumulators for _2of the tuples in pairs. reduceKeywill take two tuples with the same value sand use their _2values as aand b, producing a new Tuple[String,Int]. This operation is repeated until there is only one tuple for each key s.
a和b都是中元组的Int累加器。将采用两个具有相同值的元组并将它们的值用作and ,产生一个新的. 重复此操作,直到每个键只有一个元组。_2pairsreduceKeys_2abTuple[String,Int]s
Unlike non-Spark(or, really, non-parallel) reduceByKeywhere the first element is always the accumulator and the second a value, reduceByKeyoperates in a distributed fashion, i.e. each node will reduce it's set of tuples into a collection of uniquely-keyedtuples and then reduce the tuples from multiple nodes until there is a final uniquely-keyedset of tuples. This means as the results from nodes are reduced, aand brepresent already reduced accumulators.
与非Spark(或者,实际上,非并行)不同reduceByKey,第一个元素始终是累加器,第二个元素始终是值,reduceByKey以分布式方式运行,即每个节点将其元组集减少为唯一键元组的集合然后减少来自多个节点的元组,直到有最终唯一键控的元组集。这意味着随着节点的结果减少,a并且b代表已经减少的累加器。
回答by KayV
Spark RDD reduceByKey function merges the values for each key using an associative reduce function.
Spark RDD reduceByKey 函数使用关联的 reduce 函数合并每个键的值。
The reduceByKey function works only on the RDDs and this is a transformation operation that means it is lazily evaluated. And an associative function is passed as a parameter, which is applied to source RDD and creates a new RDD as a result.
reduceByKey 函数仅适用于 RDD,这是一个转换操作,这意味着它是惰性求值的。并且关联函数作为参数传递,该函数应用于源 RDD 并作为结果创建新的 RDD。
So in your example, rdd pairs has a set of multiple paired elements like (s1,1), (s2,1) etc. And reduceByKey accepts a function (accumulator, n) => (accumulator + n), which initialise the accumulator variable to default value 0 and adds up the element for each key and return the result rdd counts having the total counts paired with key.
所以在你的例子中,rdd 对有一组多个成对的元素,比如 (s1,1), (s2,1) 等。并且 reduceByKey 接受一个函数 (accumulator, n) => (accumulator + n),它初始化累加器变量为默认值 0 并将每个键的元素相加并返回结果 rdd counts 具有与键配对的总计数。

