scala 聚合scala函数说明
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26761087/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Explanation of the aggregate scala function
提问by lantis
I do not get to understand yet the aggregate function:
我还没有理解聚合函数:
For example, having:
例如,具有:
val x = List(1,2,3,4,5,6)
val y = x.par.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x,y) => (x._1 + y._1, x._2 + y._2))
The result will be: (21,6)
结果将是: (21,6)
Well, I think that (x,y) => (x._1 + y._1, x._2 + y._2)is to get the result in parallel, for example it will be (1 + 2, 1 + 1) and so on.
嗯,我觉得(x,y) => (x._1 + y._1, x._2 + y._2)就是并行得到结果,比如会是 (1 + 2, 1 + 1) 等等。
But exactly this part that leaves me confused:
但正是这一部分让我感到困惑:
(x, y) => (x._1 + y, x._2 + 1)
why x._1 + y? and here x._2is 0?
为什么x._1 + y?这x._2是0?
Thanks in advance.
提前致谢。
回答by Rashmit Rathod
First of all Thanks to Diego's reply which helped me connect the dots in understanding aggregate() function..
首先感谢 Diego 的回复,它帮助我将理解聚合()函数的要点联系起来。
Let me confess that I couldn't sleep last night properly because I couldn't get how aggregate() works internally, I'll get good sleep tonight definitely :-)
让我承认,我昨晚无法正常入睡,因为我无法了解aggregate() 的内部工作方式,今晚我肯定会睡个好觉:-)
Let's start understanding it
让我们开始了解它
val result = List(1,2,3,4,5,6,7,8,9,10).par.aggregate((0, 0))
(
(x, y) => (x._1 + y, x._2 + 1),
(x,y) =>(x._1 + y._1, x._2 + y._2)
)
result: (Int, Int) = (55,10)
结果: (Int, Int) = (55,10)
aggregate function has 3 parts :
聚合函数有 3 个部分:
- initial value of accumulators : tuple(0,0) here
- seqop : It works like foldLeft with initial value of 0
- combop : It combines the result generated through parallelization (this part was difficult for me to understand)
- 累加器的初始值:这里的元组(0,0)
- seqop :它的工作原理类似于 foldLeft,初始值为 0
- combop :它结合了通过并行化产生的结果(这部分我很难理解)
Let's understand all 3 parts independently :
让我们独立了解所有 3 个部分:
part-1 : Initial tuple (0,0)
第 1 部分:初始元组 (0,0)
Aggregate() starts with initial value of accumulators x which is (0,0) here. First tuple x._1 which is initially 0 is used to compute the sum, Second tuple x._2 is used to compute total number of elements in the list.
Aggregate() 从累加器 x 的初始值开始,这里是 (0,0)。第一个元组 x._1 最初为 0 用于计算总和,第二个元组 x._2 用于计算列表中元素的总数。
part-2 : (x, y) => (x._1 + y, x._2 + 1)
第二部分:(x, y) => (x._1 + y, x._2 + 1)
If you know how foldLeft works in scala then it should be easy to understand this part. Above function works just like foldLeft on our List(1,2,3,4...10).
如果您知道 foldLeft 在 Scala 中的工作原理,那么这部分应该很容易理解。上面的函数就像我们 List(1,2,3,4...10) 上的 foldLeft 一样。
Iteration# (x._1 + y, x._2 + 1)
1 (0+1, 0+1)
2 (1+2, 1+1)
3 (3+3, 2+1)
4 (6+4, 3+1)
. ....
. ....
10 (45+10, 9+1)
thus after all 10 iteration you'll get the result (55,10). If you understand this part the rest is very easy but for me it was the most difficult part in understanding if all the required computation are finished then what is the use of second part i.e. compop - stay tuned :-)
因此,在所有 10 次迭代之后,您将得到结果 (55,10)。如果你理解了这部分,剩下的就很容易了,但对我来说这是最难理解的部分,如果所有必需的计算都完成了,那么第二部分有什么用,即 compop - 敬请期待:-)
part 3 : (x,y) =>(x._1 + y._1, x._2 + y._2)
第 3 部分:(x,y) =>(x._1 + y._1, x._2 + y._2)
Well this 3rd part is combOp which combines the result generated by different threads during parallelization, remember we used 'par' in our code to enable parallel computation of list :
那么这第三部分是combOp,它结合了并行化过程中不同线程生成的结果,请记住我们在代码中使用了'par'来启用列表的并行计算:
List(1,2,3,4,5,6,7,8,9,10).par.aggregate(....)
列表(1,2,3,4,5,6,7,8,9,10).par.aggregate(....)
Apache spark is effectively using aggregate function to do parallel computation of RDD.
Apache spark 有效地使用聚合函数来进行 RDD 的并行计算。
Let's assume that our List(1,2,3,4,5,6,7,8,9,10) is being computed by 3 threads in parallel. Here each thread is working on partial list and then our aggregate() combOp will combine the result of each thread's computation using the below code :
假设我们的 List(1,2,3,4,5,6,7,8,9,10) 由 3 个线程并行计算。这里每个线程都在处理部分列表,然后我们的aggregate() combOp 将使用以下代码组合每个线程的计算结果:
(x,y) =>(x._1 + y._1, x._2 + y._2)
Original list : List(1,2,3,4,5,6,7,8,9,10)
原始列表:列表(1,2,3,4,5,6,7,8,9,10)
Thread1 start computing on partial list say (1,2,3,4), Thread2 computes (5,6,7,8) and Thread3 computes partial list say (9,10)
Thread1 开始计算部分列表,例如 (1,2,3,4),Thread2 计算 (5,6,7,8) 和 Thread3 计算部分列表,例如 (9,10)
At the end of computation, Thread-1 result will be (10,4), Thread-2 result will be (26,4) and Thread-3 result will be (19,2).
计算结束时,线程1的结果将为(10,4),线程2的结果为(26,4),线程3的结果为(19,2)。
At the end of parallel computation, we'll have ((10,4),(26,4),(19,2))
在并行计算结束时,我们将有 ((10,4),(26,4),(19,2))
Iteration# (x._1 + y._1, x._2 + y._2)
1 (0+10, 0+4)
2 (10+26, 4+4)
3 (36+19, 8+2)
which is (55,10).
即 (55,10)。
Finally let me re-iterate that seqOp job is to compute the sum of all the elements of list and total number of list whereas combine function's job is to combine different partial result generated during parallelization.
最后让我重申一下,seqOp 的工作是计算列表中所有元素的总和和列表的总数,而 combine 函数的工作是合并并行化期间生成的不同部分结果。
I hope above explanation help you understand the aggregate().
我希望上面的解释能帮助你理解聚合()。
回答by Nate
From the documentation:
从文档:
def aggregate[B](z: ? B)(seqop: (B, A) ? B, combop: (B, B) ? B): B
Aggregates the results of applying an operator to subsequent elements.
This is a more general form of fold and reduce. It has similar semantics, but does not require the result to be a supertype of the element type. It traverses the elements in different partitions sequentially, using seqop to update the result, and then applies combop to results from different partitions. The implementation of this operation may operate on an arbitrary number of collection partitions, so combop may be invoked an arbitrary number of times.
For example, one might want to process some elements and then produce a Set. In this case, seqop would process an element and append it to the list, while combop would concatenate two lists from different partitions together. The initial value z would be an empty set.
pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)Another example is calculating geometric mean from a collection of doubles (one would typically require big doubles for this). B the type of accumulated results z the initial value for the accumulated result of the partition - this will typically be the neutral element for the seqop operator (e.g. Nil for list concatenation or 0 for summation) and may be evaluated more than once seqop an operator used to accumulate results within a partition combop an associative operator used to combine results from different partitions
聚合将运算符应用于后续元素的结果。
这是折叠和减少的更一般形式。它具有相似的语义,但不要求结果是元素类型的超类型。它依次遍历不同分区中的元素,使用seqop更新结果,然后对来自不同分区的结果应用combo。此操作的实现可能对任意数量的集合分区进行操作,因此可以调用任意次数的组合。
例如,人们可能想要处理一些元素,然后生成一个 Set。在这种情况下,seqop 会处理一个元素并将其附加到列表中,而 combop 会将来自不同分区的两个列表连接在一起。初始值 z 将是一个空集。
pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)另一个例子是从一组双精度数计算几何平均值(为此通常需要大双精度数)。B 累积结果的类型 z 分区累积结果的初始值 - 这通常是 seqop 运算符的中性元素(例如,Nil 表示列表连接或 0 表示求和),并且可以多次评估 seqop 运算符用于在分区组合内累积结果 关联运算符用于组合来自不同分区的结果
In your example Bis a Tuple2[Int, Int]. The method seqopthen takes a single element from the list, scoped as y, and updates the aggregate Bto (x._1 + y, x._2 + 1). So it increments the second element in the tuple. This effectively puts the sum of elements into the first element of the tuple and the number of elements into the second element of the tuple.
在您的示例中B是一个Tuple2[Int, Int]. seqop然后该方法从列表中获取单个元素,范围为y,并将聚合更新B为(x._1 + y, x._2 + 1)。所以它增加了元组中的第二个元素。这有效地将元素的总和放入元组的第一个元素中,并将元素的数量放入元组的第二个元素中。
The method combopthen takes the results from each parallel execution thread and combines them. Combination by addition provides the same results as if it were run on the list sequentially.
combop然后该方法从每个并行执行线程中获取结果并将它们组合起来。加法组合提供的结果与按顺序在列表上运行时的结果相同。
Using Bas a tuple is likely the confusing piece of this. You can break the problem down into two sub problems to get a better idea of what this is doing. res0is the first element in the result tuple, and res1is the second element in the result tuple.
使用B作为一个元组是可能的这种混乱一片。您可以将问题分解为两个子问题,以便更好地了解这是做什么的。res0是结果元组中的第一个元素,是结果元组res1中的第二个元素。
// Sums all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res0: Int = 21
// Counts all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + 1, (x, y) => x + y)
res1: Int = 6
回答by Diego Martinoia
aggregate takes 3 parameters: a seed value, a computation function and a combination function.
聚合采用 3 个参数:种子值、计算函数和组合函数。
What it does is basically split the collection in a number of threads, compute partial results using the computation function and then combine all these partial results using the combination function.
它所做的基本上是将集合拆分为多个线程,使用计算函数计算部分结果,然后使用组合函数组合所有这些部分结果。
From what I can tell, your example function will return a pair (a, b) where a is the sum of the values in the list, b is the number of values in the list. Indeed, (21, 6).
据我所知,您的示例函数将返回一对 (a, b),其中 a 是列表中值的总和,b 是列表中值的数量。事实上,(21, 6)。
How does this work? The seed value is the (0,0) pair. For an empty list, we have a sum of 0 and a number of items 0, so this is correct.
这是如何运作的?种子值是 (0,0) 对。对于一个空列表,我们有一个总和为 0 并且项目数为 0,所以这是正确的。
Your computation function takes an (Int, Int) pair x, which is your partial result, and a Int y, which is the next value in the list. This is your:
您的计算函数采用 (Int, Int) 对 x(您的部分结果)和 Int y(列表中的下一个值)。这是你的:
(x, y) => (x._1 + y, x._2 + 1)
Indeed, the result that we want is to increase the left element of x (the accumulator) by y, and the right element of x (the counter) by 1 for each y.
实际上,我们想要的结果是将 x 的左侧元素(累加器)增加 y,对于每个 y,将 x 的右侧元素(计数器)增加 1。
Your combination function takes an (Int, Int) pair x and an (Int, Int) pair y, which are your two partial results from different parallel computations, and combines them together as:
您的组合函数采用 (Int, Int) 对 x 和 (Int, Int) 对 y,这是不同并行计算的两个部分结果,并将它们组合在一起为:
(x,y) => (x._1 + y._1, x._2 + y._2)
Indeed, we sum independently the left parts of the pairs and right parts of the pairs.
实际上,我们独立地对对的左部分和对的右部分求和。
Your confusion comes from the fact that x and y in the first function ARE NOT the same x and y of the second function. In the first function, you have x of the type of the seed value, and y of the type of the collection elements, and you return a result of the type of x. In the second function, your two parameters are both of the same type of your seed value.
您的困惑来自于第一个函数中的 x 和 y 与第二个函数中的 x 和 y 不同。在第一个函数中,您有种子值类型的 x 和集合元素类型的 y,并返回 x 类型的结果。在第二个函数中,您的两个参数都与您的种子值类型相同。
Hope it's clearer now!
希望现在更清楚了!
回答by Shiva Garg
Adding to Rashmit answer.
添加到 Rashmit 答案。
- CombOp is called only if the collection is processed in parallel mode.
- 仅当以并行模式处理集合时才调用 ComboOp。
See below example :
见下面的例子:
val listP: ParSeq[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).par
val listP: ParSeq[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).par
val aggregateOp1 = listP.aggregate[String]("Aggregate-")((a, b) => a + b, (s1, s2) => {
println("Combiner called , if collections is processed parallel mode")
s1 + "," + s2
})
println(aggregateOp1)
OP : Aggregate-1,Aggregate-2,Aggregate-3,Aggregate-45,Aggregate-6,Aggregate-7,Aggregate-8,Aggregate-910
OP : Aggregate-1,Aggregate-2,Aggregate-3,Aggregate-45,Aggregate-6,Aggregate-7,Aggregate-8,Aggregate-910
val list: Seq[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val aggregateOp2 = list.aggregate[String]("Aggregate-")((a, b) => a + b, (s1, s2) => {
println("Combiner called , if collections is processed parallel mode")
s1 + "," + s2
})
println(aggregateOp2)
}
}
OP : Aggregate-12345678910
操作:聚合 12345678910
In above example, combiner operation is called only if collection is operated in parallel
在上面的例子中,只有在并行操作集合时才会调用组合器操作

