Scala 聚合函数的示例

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/6928374/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 03:18:37  来源:igfitidea点击:

Example of the Scala aggregate function

scalaaggregate-functions

提问by christangrant

I have been looking and I cannot find an example or discussion of the aggregatefunction in Scala that I can understand. It seems pretty powerful.

我一直在寻找,但找不到aggregate我能理解的 Scala 函数的示例或讨论。好像还蛮强大的。

Can this function be used to reduce the values of tuples to make a multimap-type collection? For example:

这个函数可以用来减少元组的值来制作多图类型的集合吗?例如:

val list = Seq(("one", "i"), ("two", "2"), ("two", "ii"), ("one", "1"), ("four", "iv"))

After applying aggregate:

应用聚合后:

Seq(("one" -> Seq("i","1")), ("two" -> Seq("2", "ii")), ("four" -> Seq("iv"))

Also, can you give example of parameters z, segop, and combop? I'm unclear on what these parameters do.

此外,您还可以给实例参数zsegopcombop?我不清楚这些参数的作用。

回答by Daniel C. Sobral

Let's see if some ascii art doesn't help. Consider the type signature of aggregate:

让我们看看一些 ascii 艺术是否没有帮助。考虑 的类型签名aggregate

def aggregate [B] (z: B)(seqop: (B, A) ? B, combop: (B, B) ? B): B

Also, note that Arefers to the type of the collection. So, let's say we have 4 elements in this collection, then aggregatemight work like this:

另外,请注意,A指的是集合的类型。所以,假设我们在这个集合中有 4 个元素,那么aggregate可能会像这样工作:

z   A   z   A   z   A   z   A
 \ /     \ /seqop\ /     \ /    
  B       B       B       B
    \   /  combop   \   /
      B _           _ B
         \ combop  /
              B

Let's see a practical example of that. Say I have a GenSeq("This", "is", "an", "example"), and I want to know how many characters there are in it. I can write the following:

让我们看一个实际的例子。假设我有一个GenSeq("This", "is", "an", "example"),我想知道里面有多少个字符。我可以写以下内容:

Note the use of parin the below snippet of code. The second function passed to aggregate is what is called after the individual sequences are computed. Scala is only able to do this for sets that can be parallelized.

请注意par以下代码片段中的使用。传递给聚合的第二个函数是在计算单个序列之后调用的函数。Scala 只能对可以并行化的集合执行此操作。

import scala.collection.GenSeq
val seq = GenSeq("This", "is", "an", "example")
val chars = seq.par.aggregate(0)(_ + _.length, _ + _)

So, first it would compute this:

所以,首先它会计算这个:

0 + "This".length     // 4
0 + "is".length       // 2
0 + "an".length       // 2
0 + "example".length  // 7

What it does next cannot be predicted (there are more than one way of combining the results), but it might do this (like in the ascii art above):

它接下来会做什么无法预测(组合结果的方法不止一种),但它可能会这样做(如上面的 ascii 艺术):

4 + 2 // 6
2 + 7 // 9

At which point it concludes with

此时它以

6 + 9 // 15

which gives the final result. Now, this is a bit similar in structure to foldLeft, but it has an additional function (B, B) => B, which fold doesn't have. This function, however, enables it to work in parallel!

这给出了最终结果。现在,这在结构上有点类似于foldLeft,但它具有(B, B) => Bfold 没有的附加功能。但是,此功能使其能够并行工作!

Consider, for example, that each of the four computations initial computations are independent of each other, and can be done in parallel. The next two (resulting in 6 and 9) can be started once their computations on which they depend are finished, but these two can alsorun in parallel.

例如,考虑到四个计算的初始计算中的每一个都是相互独立的,并且可以并行完成。一旦它们所依赖的计算完成,就可以开始接下来的两个(导致 6 和 9),但这两个可以并行运行。

The 7 computations, parallelized as above, could take as little as the same time 3 serial computations.

如上所述并行化的 7 次计算可能只需要同时进行 3 次串行计算。

Actually, with such a small collection the cost in synchronizing computation would be big enough to wipe out any gains. Furthermore, if you folded this, it would only take 4computations total. Once your collections get larger, however, you start to see some real gains.

实际上,对于如此小的集合,同步计算的成本足以抵消任何收益。此外,如果你折叠它,总共只需要4 次计算。然而,一旦您的收藏变得更大,您就会开始看到一些真正的收益。

Consider, on the other hand, foldLeft. Because it doesn't have the additional function, it cannot parallelize any computation:

另一方面,考虑foldLeft. 因为它没有附加功能,所以它不能并行化任何计算:

(((0 + "This".length) + "is".length) + "an".length) + "example".length

Each of the inner parenthesis must be computed before the outer one can proceed.

必须先计算每个内括号,然后才能继续计算外括号。

回答by Didier Dupont

The aggregate function does not do that (except that it is a very general function, and it could be used to do that). You want groupBy. Close to at least. As you start with a Seq[(String, String)], and you group by taking the first item in the tuple (which is (String, String) => String), it would return a Map[String, Seq[(String, String)]). You then have to discard the first parameter in the Seq[String, String)] values.

聚合函数不这样做(除了它是一个非常通用的函数,它可以用来做到这一点)。你要groupBy。至少接近。当您从 a 开始时Seq[(String, String)],您通过获取元组中的第一项(即(String, String) => String),它将返回 a Map[String, Seq[(String, String)])进行分组。然后您必须丢弃 Seq[String, String)] 值中的第一个参数。

So

所以

list.groupBy(_._1).mapValues(_.map(_._2))

There you get a Map[String, Seq[(String, String)]. If you want a Seqinstead of Map, call toSeqon the result. I don't think you have a guarantee on the order in the resulting Seq though

在那里你得到一个Map[String, Seq[(String, String)]. 如果您想要一个Seq而不是Map,请调用toSeq结果。我不认为你对结果 Seq 中的顺序有保证



Aggregate is a more difficult function.

聚合是一个更难的函数。

Consider first reduceLeft and reduceRight. Let asbe a non empty sequence as = Seq(a1, ... an)of elements of type A, and f: (A,A) => Abe some way to combine two elements of type Ainto one. I will note it as a binary operator @, a1 @ a2rather than f(a1, a2). as.reduceLeft(@)will compute (((a1 @ a2) @ a3)... @ an). reduceRightwill put the parentheses the other way, (a1 @ (a2 @... @ an)))). If @happens to be associative, one does not care about the parentheses. One could compute it as (a1 @... @ ap) @ (ap+1 @...@an)(there would be parantheses inside the 2 big parantheses too, but let's not care about that). Then one could do the two parts in parallel, while the nested bracketing in reduceLeft or reduceRight force a fully sequential computation. But parallel computation is only possible when @is known to be associative, and the reduceLeft method cannot know that.

首先考虑reduceLeft 和reduceRight。让as是一个非空as = Seq(a1, ... an)的 type 元素序列A,并f: (A,A) => A以某种方式将两个 type 元素合并A为一个。我将把它记为一个二元运算符@a1 @ a2而不是f(a1, a2). as.reduceLeft(@)将计算(((a1 @ a2) @ a3)... @ an). reduceRight将括号放在另一种方式,(a1 @ (a2 @... @ an)))). 如果@碰巧是关联的,则不关心括号。人们可以将其计算为(a1 @... @ ap) @ (ap+1 @...@an)(在 2 个大括号内也会有括号,但我们不关心这一点)。然后可以并行执行这两个部分,而 reduceLeft 或 reduceRight 中的嵌套括号强制完全顺序计算。但只有在以下情况下才能进行并行计算@已知是关联的,而 reduceLeft 方法无法知道这一点。

Still, there could be method reduce, whose caller would be responsible for ensuring that the operation is associative. Then reducewould order the calls as it sees fit, possibly doing them in parallel. Indeed, there is such a method.

尽管如此,仍然可能有方法reduce,其调用者将负责确保操作是关联的。然后reduce会按照它认为合适的方式对调用进行排序,可能会并行执行。确实有这样的方法。

There is a limitation with the various reduce methods however. The elements of the Seq can only be combined to a result of the same type: @has to be (A,A) => A. But one could have the more general problem of combining them into a B. One starts with a value bof type B, and combine it with every elements of the sequence. The operator @is (B,A) => B, and one computes (((b @ a1) @ a2) ... @ an). foldLeftdoes that. foldRightdoes the same thing but starting with an. There, the @operation has no chance to be associative. When one writes b @ a1 @ a2, it must mean (b @ a1) @ a2, as (a1 @ a2)would be ill-typed. So foldLeft and foldRight have to be sequential.

然而,各种reduce 方法都存在局限性。Seq 的元素只能组合成相同类型的结果:@必须是(A,A) => A。但是人们可能会遇到将它们组合成一个更普遍的问题B。一个以btype的值开始B,并将其与序列的每个元素组合。运算符@(B,A) => B,并且一个计算(((b @ a1) @ a2) ... @ an)foldLeft这样做。foldRight做同样的事情,但从an. 在那里,@操作没有机会进行关联。当一个人写时b @ a1 @ a2,它必须意味着(b @ a1) @ a2,因为(a1 @ a2)会是错误类型的。所以 foldLeft 和 foldRight 必须是连续的。

Suppose however, that each Acan be turned into a B, let's write it with !, a!is of type B. Suppose moreover that there is a +operation (B,B) => B, and that @is such that b @ ais in fact b + a!. Rather than combining elements with @, one could first transform all of them to B with !, then combine them with +. That would be as.map(!).reduceLeft(+). And if +is associative, then that can be done with reduce, and not be sequential: as.map(!).reduce(+). There could be an hypothetical method as.associativeFold(b, !, +).

然而,假设每个A都可以变成B,让我们用 来写!a!是类型B。再假设有一个+操作(B,B) => B,事实上@就是这样。与其将元素与 @ 组合,不如先将它们全部转换为 B with ,然后将它们与 组合。那将是。如果是关联的,那么可以用 reduce 来完成,而不是顺序的:as.map(!).reduce(+)。可能有一个假设的方法 as.associativeFold(b, !, +)。b @ ab + a!!+as.map(!).reduceLeft(+)+

Aggregate is very close to that. It may be however, that there is a more efficient way to implement b@athan b+a!For instance, if type Bis List[A], and b@a is a::b, then a!will be a::Nil, and b1 + b2will be b2 ::: b1. a::b is way better than (a::Nil):::b. To benefit from associativity, but still use @, one first splits b + a1! + ... + an!, into (b + a1! + ap!) + (ap+1! + ..+ an!), then go back to using @with (b @ a1 @ an) + (ap+1! @ @ an). One still needs the ! on ap+1, because one must start with some b. And the + is still necessary too, appearing between the parantheses. To do that, as.associativeFold(!, +)could be changed to as.optimizedAssociativeFold(b, !, @, +).

聚合非常接近。这可能是然而,存在实现更有效的方式b@ab+a!例如,如果类型BList[A],和b @ a是:: B,那么a!a::Nilb1 + b2b2 ::: b1。a::b 比 (a::Nil)::b 好得多。为了从关联性中受益,但仍然使用@,首先将b + a1! + ... + an!,拆分为(b + a1! + ap!) + (ap+1! + ..+ an!),然后返回使用@with (b @ a1 @ an) + (ap+1! @ @ an)。一个仍然需要!在 ap+1 上,因为必须从某个 b 开始。+ 仍然是必要的,出现在括号之间。为此,as.associativeFold(!, +)可以更改为as.optimizedAssociativeFold(b, !, @, +).

Back to +. +is associative, or equivalently, (B, +)is a semigroup. In practice, most of the semigroups used in programming happen to be monoids too, i.e they contain a neutral element z(for zero) in B, so that for each b, z + b= b + z= b. In that case, the !operation that make sense is likely to be be a! = z @ a. Moreover, as z is a neutral element b @ a1 ..@ an = (b + z) @ a1 @ anwhich is b + (z + a1 @ an). So is is always possible to start the aggregation with z. If bis wanted instead, you do b + resultat the end. With all those hypotheses, we can do as.aggregate(z, @, +). That is what aggregatedoes. @is the seqopargument (applied in a sequencez @ a1 @ a2 @ ap), and +is combop(applied to already partially combinedresults, as in (z + a1@...@ap) + (z + ap+1@...@an)).

回到+. +是结合的,或者等价的,(B, +)是一个半群。在实践中,大多数在编程中使用的半群的碰巧是类群太,即它们含有一个中性元素z(对于在B),使得对于每个bz + b= b + z= b。在这种情况下,!有意义的操作很可能是a! = z @ a. 此外,由于 z 是一个中性元素b @ a1 ..@ an = (b + z) @ a1 @ an,即b + (z + a1 @ an)。所以总是可以用 z 开始聚合。如果b需要,则b + result在最后进行。有了所有这些假设,我们可以做一个s.aggregate(z, @, +). 这就是aggregate它的作用。@seqop参数(应用于序列z @ a1 @ a2 @ ap),和+combop(适用于已经部分组合的结果,如(z + a1@...@ap) + (z + ap+1@...@an))。

To sum it up, as.aggregate(z)(seqop, combop)computes the same thing as as.foldLeft(z)( seqop)provided that

总而言之,as.aggregate(z)(seqop, combop)计算与as.foldLeft(z)( seqop)提供的相同的东西

  • (B, combop, z)is a monoid
  • seqop(b,a) = combop(b, seqop(z,a))
  • (B, combop, z)是幺半群
  • seqop(b,a) = combop(b, seqop(z,a))

aggregate implementation may use the associativity of combop to group the computations as it likes (not swapping elements however, + has not to be commutative, ::: is not). It may run them in parallel.

聚合实现可以使用 combop 的关联性来根据需要对计算进行分组(但不是交换元素,+ 不必是可交换的,::: 不是)。它可以并行运行它们。

Finally, solving the initial problem using aggregateis left as an exercise to the reader. A hint: implement using foldLeft, then find zand combothat will satisfy the conditions stated above.

最后,使用 using 解决初始问题aggregate作为练习留给读者。一个提示:实现使用foldLeft,然后找到zcombo能满足上述条件。

回答by paradigmatic

The signature for a collection with elements of type A is:

具有类型 A 元素的集合的签名是:

def aggregate [B] (z: B)(seqop: (B, A) ? B, combop: (B, B) ? B): B 
  • zis an object of type B acting as a neutral element. If you want to count something, you can use 0, if you want to build a list, start with an empty list, etc.
  • segopis analoguous to the function you pass to foldmethods. It takes two argument, the first one is the same type as the neutral element you passed and represent the stuff which was already aggregated on previous iteration, the second one is the next element of your collection. The result must also by of type B.
  • combop: is a function combining two results in one.
  • z是作为中性元素的 B 类对象。如果你想计算一些东西,你可以使用0,如果你想建立一个列表,从一个空列表开始,等等。
  • segop类似于传递给fold方法的函数。它需要两个参数,第一个与您传递的中性元素的类型相同,代表在前一次迭代中已经聚合的内容,第二个是您的集合的下一个元素。结果也必须属于 类型B
  • combop: 是将两个结果合二为一的函数。

In most collections, aggregate is implemented in TraversableOnceas:

在大多数集合中,聚合实现TraversableOnce为:

  def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B 
    = foldLeft(z)(seqop)

Thus combopis ignored. However, it makes sense for parallel collections, becauseseqopwill first be applied locally in parallel, and then combopis called to finish the aggregation.

因此combop被忽略。但是,对于并行集合来说是有意义,因为它seqop会首先在本地并行应用,然后combop被调用来完成聚合。

So for your example, you can try with a fold first:

因此,对于您的示例,您可以先尝试折叠:

val seqOp = 
  (map:Map[String,Set[String]],tuple: (String,String)) => 
    map + ( tuple._1 -> ( map.getOrElse( tuple._1, Set[String]() ) + tuple._2 ) )


list.foldLeft( Map[String,Set[String]]() )( seqOp )
// returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

Then you have to find a way of collapsing two multimaps:

然后你必须找到一种折叠两个多图的方法:

val combOp = (map1: Map[String,Set[String]], map2: Map[String,Set[String]]) =>
       (map1.keySet ++ map2.keySet).foldLeft( Map[String,Set[String]]() ) { 
         (result,k) => 
           result + ( k -> ( map1.getOrElse(k,Set[String]() ) ++ map2.getOrElse(k,Set[String]() ) ) ) 
       } 

Now, you can use aggregate in parallel:

现在,您可以并行使用聚合:

list.par.aggregate( Map[String,Set[String]]() )( seqOp, combOp )
//Returns: Map(one -> Set(i, 1), two -> Set(2, ii), four -> Set(iv))

Applying the method "par" to list, thus using the parallel collection(scala.collection.parallel.immutable.ParSeq) of the list to really take advantage of the multi core processors. Without "par", there won't be any performance gain since the aggregate is not done on the parallel collection.

将方法“par”应用于列表,从而使用列表的并行集合(scala.collection.parallel.immutable.ParSeq)来真正利用多核处理器。没有“par”,不会有任何性能提升,因为聚合不是在并行集合上完成的。

回答by Debilski

aggregateis like foldLeftbut may executed in parallel.

aggregate就像foldLeft但可以并行执行。

As missingfactor says, the linear version of aggregate(z)(seqop, combop)is equivalent to foldleft(z)(seqop). This is however impractical in the parallel case, where we would need to combine not only the next element with the previous result (as in a normal fold) but we want to split the iterable into sub-iterables on which we call aggregate and need to combine those again. (In left-to-right order but not associative as we might have combined the last parts before the fist parts of the iterable.) This re-combining in in general non-trivial, and therefore, one needs a method (S, S) => Sto accomplish that.

正如missingfactor 所说, 的线性版本aggregate(z)(seqop, combop)等价于foldleft(z)(seqop)。然而,这在并行情况下是不切实际的,在这种情况下,我们不仅需要将下一个元素与前一个结果组合起来(如在正常折叠中),而且我们还想将可迭代对象拆分为我们称之为聚合的子可迭代对象,并且需要再次结合这些。(按从左到右的顺序,但不是关联的,因为我们可能已经在可迭代的第一个部分之前组合了最后的部分。)这种重新组合通常非常重要,因此,需要一种方法(S, S) => S来实现这一点。

The definition in ParIterableLikeis:

中的定义ParIterableLike是:

def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
  executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}

which indeed uses combop.

这确实使用combop.

For reference, Aggregateis defined as:

作为参考,Aggregate定义为:

protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, protected[this] val pit: IterableSplitter[T])
  extends Accessor[S, Aggregate[S]] {
    @volatile var result: S = null.asInstanceOf[S]
    def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Aggregate(z, seqop, combop, p)
    override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}

The important part is mergewhere combopis applied with two sub-results.

重要的部分是mergewherecombop与两个子结果一起应用。

回答by Win Myo Htet

Here is the blog on how aggregate enable performance on the multi cores processor with bench mark. http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

这是关于如何在具有基准的多核处理器上聚合启用性能的博客。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

Here is video on "Scala parallel collections" talk from "Scala Days 2011". http://days2011.scala-lang.org/node/138/272

这是“Scala Days 2011”中关于“Scala parallel collections”演讲的视频。 http://days2011.scala-lang.org/node/138/272

The description on the video

视频中的说明

Scala Parallel Collections

Scala 并行集合

Aleksandar Prokopec

亚历山大·普罗科佩克

Parallel programming abstractions become increasingly important as the number of processor cores grows. A high-level programming model enables the programmer to focus more on the program and less on low-level details such as synchronization and load-balancing. Scala parallel collections extend the programming model of the Scala collection framework, providing parallel operations on datasets. The talk will describe the architecture of the parallel collection framework, explaining their implementation and design decisions. Concrete collection implementations such as parallel hash maps and parallel hash tries will be described. Finally, several example applications will be shown, demonstrating the programming model in practice.

随着处理器内核数量的增加,并行编程抽象变得越来越重要。高级编程模型使程序员能够更多地关注程序,而较少关注同步和负载平衡等低级细节。Scala 并行集合扩展了 Scala 集合框架的编程模型,提供了对数据集的并行操作。演讲将描述并行集合框架的架构,解释它们的实现和设计决策。将描述具体的集合实现,例如并行哈希映射和并行哈希尝试。最后,将展示几个示例应用程序,在实践中演示编程模型。

回答by Micheal Kris

Just to clarify explanations of those before me, in theory the idea is that aggregate should work like this, (I have changed the names of the parameters to make them clearer):

只是为了澄清对我之前的那些解释,理论上的想法是聚合应该像这样工作,(我已经更改了参数的名称以使其更清晰):

Seq(1,2,3,4).aggragate(0)(
     addToPrev = (prev,curr) => prev + curr, 
     combineSums = (sumA,sumB) => sumA + sumB)

Should logically translate to

应该合乎逻辑地转化为

Seq(1,2,3,4)
    .grouped(2) // split into groups of 2 members each
    .map(prevAndCurrList => prevAndCurrList(0) + prevAndCurrList(1))
    .foldLeft(0)(sumA,sumB => sumA + sumB)

Because the aggregation and mapping are separate, the original list could theoretically be split into different groups of different sizes and run in parallel or even on different machine. In practice scala current implementation does not support this feature by default but you can do this in your own code.

因为聚合和映射是分开的,所以理论上可以将原始列表分成不同大小的不同组,并行运行甚至在不同的机器上运行。实际上,Scala 当前的实现默认不支持此功能,但您可以在自己的代码中执行此操作。

回答by missingfaktor

The definition of aggregatein TraversableOncesource is:

aggregatein TraversableOncesource的定义是:

def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B = 
  foldLeft(z)(seqop)

which is no different than a simple foldLeft. combopdoesn't seem to be used anywhere. I am myself confused as to what the purpose of this method is.

这与简单的foldLeft. combop似乎没有在任何地方使用。我自己对这种方法的目的是什么感到困惑。