Python 解释 Spark 中的聚合功能

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28240706/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 02:58:47  来源:igfitidea点击:

Explain the aggregate functionality in Spark

pythonapache-sparklambdaaggregaterdd

提问by ab_tech_sp

I am looking for some better explanation of the aggregate functionality that is available via spark in python.

我正在寻找对通过 python 中的 spark 可用的聚合功能的更好解释。

The example I have is as follows (using pyspark from Spark 1.2.0 version)

我的例子如下(使用来自 Spark 1.2.0 版本的 pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

输出:

(10, 4)

I get the expected result (10,4)which is sum of 1+2+3+4and 4 elements. If I change the initial value passed to the aggregate function to (1,0)from (0,0)I get the following result

我得到了4 个元素的(10,4)总和的预期结果1+2+3+4。如果我将传递给聚合函数的初始值更改为(1,0)from(0,0)我得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

输出:

(19, 4)

The value increases by 9. If I change it to (2,0), the value goes to (28,4)and so on.

该值增加 9。如果我将其更改为(2,0),则该值会变为(28,4),依此类推。

Can someone explain to me how this value is calculated? I expected the value to go up by 1 not by 9, expected to see (11,4)instead I am seeing (19,4).

有人可以向我解释这个值是如何计算的吗?我预计价值会增加 1 而不是 9,(11,4)而是希望看到我看到的(19,4)

采纳答案by John Knight

I don't have enough reputation points to comment on the previous answer by Maasg. Actually the zero value should be 'neutral' towards the seqop, meaning it wouldn't interfere with the seqop result, like 0 towards add, or 1 towards *;

我没有足够的声望点来评论 Maasg 的先前回答。实际上,零值对于 seqop 应该是“中性的”,这意味着它不会干扰 seqop 结果,例如 0 表示添加,或 1 表示 *;

You should NEVER try with non-neutral values as it might be applied arbitrary times. This behavior is not only tied to num of partitions.

您永远不应该尝试使用非中性值,因为它可能会应用于任意时间。此行为不仅与分区数有关。

I tried the same experiment as stated in the question. with 1 partition, the zero value was applied 3 times. with 2 partitions, 6 times. with 3 partitions, 9 times and this will go on.

我尝试了与问题中所述相同的实验。对于 1 个分区,零值应用了 3 次。带2个分区,6次。有 3 个分区,9 次,这将继续。

回答by maasg

Aggregate lets you transform and combine the values of the RDD at will.

Aggregate 允许您随意转换和组合 RDD 的值。

It uses two functions:

它使用两个函数:

The first one transforms and adds the elements of the original collection [T] in a local aggregate [U] and takes the form: (U,T) => U. You can see it as a fold and therefore it also requires a zero for that operation. This operation is applied locally to each partition in parallel.

第一个将原始集合 [T] 的元素转换并添加到本地聚合 [U] 中,并采用以下形式:(U,T) => U。您可以将其视为折叠,因此它也需要一个零对于那个操作。此操作在本地并行应用于每个分区。

Here is where the key of the question lies: The only value that should be used here is the ZERO value for the reduction operation. This operation is executed locally on each partition, therefore, adding anything to that zero value will add to the result multiplied by the number of partitions of the RDD.

这就是问题的关键所在:这里应该使用的唯一值是归约运算的零值。此操作在每个分区本地执行,因此,向该零值添加任何内容都将添加到结果乘以 RDD 的分区数。

The second operation takes 2 values of the result type of the previous operation [U] and combines it in to one value. This operation will reduce the partial results of each partition and produce the actual total.

第二个操作取前一个操作 [U] 的结果类型的 2 个值,并将其合并为一个值。此操作将减少每个分区的部分结果并产生实际总数。

For example: Given an RDD of Strings:

例如:给定一个字符串 RDD:

val rdd:RDD[String] = ???

Let's say you want to the aggregate of the length of the strings in that RDD, so you would do:

假设您想要聚合该 RDD 中字符串的长度,因此您可以执行以下操作:

1) The first operation will transform strings into size (int) and accumulate the values for size.

1) 第一个操作将字符串转换为 size (int) 并累加 size 的值。

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2) provide the ZERO for the addition operation (0)

2) 为加法运算提供零 (0)

val ZERO = 0

3) an operation to add two integers together:

3) 两个整数相加的运算:

val add: (Int, Int) => Int = _ + _

Putting it all together:

把它们放在一起:

rdd.aggregate(ZERO, stringSizeCummulator, add)

So, why is the ZERO needed? When the cummulator function is applied to the first element of a partition, there's no running total. ZERO is used here.

那么,为什么需要零呢?当 cummulator 函数应用于分区的第一个元素时,没有运行总计。此处使用零。

Eg. My RDD is: - Partition 1: ["Jump", "over"] - Partition 2: ["the", "wall"]

例如。我的 RDD 是: - 分区 1:["Jump", "over"] - 分区 2: ["the", "wall"]

This will result:

这将导致:

P1:

P1:

  1. stringSizeCummulator(ZERO, "Jump") = 4
  2. stringSizeCummulator(4, "over") = 8
  1. stringSizeCummulator(零,“跳跃”)= 4
  2. stringSizeCummulator(4, "over") = 8

P2:

P2:

  1. stringSizeCummulator(ZERO, "the") = 3
  2. stringSizeCummulator(3, "wall") = 7
  1. stringSizeCummulator(ZERO, "the") = 3
  2. stringSizeCummulator(3, "wall") = 7

Reduce: add(P1, P2) = 15

减少:添加(P1,P2)= 15

回答by gsamaras

I wasn't fully convinced from the accepted answer, and JohnKnight's answer helped, so here's my point of view:

对接受的答案并不完全信服,而 JohnKnight 的回答有所帮助,所以这是我的观点:

First, let's explain aggregate()in my own words:

首先,让我们用我自己的话解释aggregate()

Prototype:

原型

aggregate(zeroValue, seqOp, combOp)

聚合(零值,seqOp,combOp)

Description:

说明

aggregate()lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.

aggregate()允许您使用 RDD 并生成与原始 RDD 中存储的类型不同的单个值。

Parameters:

参数

  1. zeroValue: The initialization value, for your result, in the desired format.
  2. seqOp: The operation you want to apply to RDD records. Runs once for every record in a partition.
  3. combOp: Defines how the resulted objects (one for every partition), gets combined.
  1. zeroValue:您的结果的初始化值,采用所需格式。
  2. seqOp:要应用到RDD记录的操作。为分区中的每条记录运行一次。
  3. combOp:定义结果对象(每个分区一个)如何组合。

Example:

示例

Compute the sum of a list and the length of that list. Return the result in a pair of (sum, length).

计算列表的总和和该列表的长度。以一对(sum, length).

In a Spark shell, I first created a list with 4 elements, with 2 partitions:

在 Spark shell 中,我首先创建了一个包含 4 个元素和 2 个分区的列表:

listRDD = sc.parallelize([1,2,3,4], 2)

then I defined my seqOp:

然后我定义了我的seqOp

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

and my combOp:

和我的combOp

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

and then I aggregated:

然后我汇总:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

As you can see, I gave descriptive names to my variables, but let me explain it further:

如您所见,我为变量指定了描述性名称,但让我进一步解释一下:

The first partition has the sublist [1, 2]. We will apply the seqOp to each element of that list and this will produce a local result, a pair of (sum, length), that will reflect the result locally, only in that first partition.

第一个分区具有子列表 [1, 2]。我们将 seqOp 应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length),将在本地反映结果,仅在第一个分区中。

So, let's start: local_resultgets initialized to the zeroValueparameter we provided the aggregate()with, i.e. (0, 0) and list_elementis the first element of the list, i.e. 1. As a result this is what happens:

所以,让我们开始吧:local_result被初始化为zeroValue我们提供的参数aggregate(),即 (0, 0) 并且list_element是列表的第一个元素,即 1。结果是这样的:

0 + 1 = 1
0 + 1 = 1

Now, the local result is (1, 1), that means, that so far, for the 1st partition, after processing only the first element, the sum is 1 and the length 1. Notice, that local_resultgets updated from (0, 0), to (1, 1).

现在,本地结果是 (1, 1),这意味着,到目前为止,对于第一个分区,仅处理第一个元素后,总和为 1,长度为 1。注意,local_result从 (0, 0 ),到 (1, 1)。

1 + 2 = 3
1 + 1 = 2

and now the local result is (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition.

现在本地结果是 (3, 2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。

Doing the same for 2nd partition, we get (7, 2).

对第二个分区做同样的事情,我们得到 (7, 2)。

Now we apply the combOp to each local result, so that we can form, the final, global result, like this: (3,2) + (7,2) = (10, 4)

现在我们将 combOp 应用于每个局部结果,以便我们可以形成最终的全局结果,如下所示: (3,2) + (7,2) = (10, 4)



Example described in 'figure':

“图”中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)


Inspired by this great example.

灵感来自这个伟大的例子



So now if the zeroValueis not (0, 0), but (1, 0), one would expect to get (8 + 4, 2 + 2) = (12, 4), which doesn't explain what you experience. Even if we alter the number of partitions of my example, I won't be able to get that again.

所以现在如果zeroValue不是 (0, 0),而是 (1, 0),人们会期望得到 (8 + 4, 2 + 2) = (12, 4),这并不能解释你的体验。即使我们更改了示例的分区数,我也无法再次获得它。

The key here is JohnKnight's answer, which state that the zeroValueis not only analogous to the number of partitions, but may be applied more times than you expect.

这里的关键是 JohnKnight 的回答,它指出zeroValue不仅类似于分区的数量,而且应用的次数可能比您预期的要多。

回答by Prasanna Saraswathi Krishnan

For people looking for Scala Equivalent code for the above example - here it is. Same logic, same input/result.

对于为上述示例寻找 Scala 等效代码的人 - 在这里。相同的逻辑,相同的输入/结果。

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)

回答by iSingh

Great explanations, it really helped me to understand the underneath working of the aggregate function. I have played with it for some time and found out as below.

很好的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。

  • if you are using the acc as (0,0) then it will not change the result of the out put of the function.

  • if the initial accumulator is changed then it will process the result something as below

  • 如果您使用 acc 作为 (0,0) 那么它不会改变函数的输出结果。

  • 如果初始累加器发生变化,那么它将处理结果如下

[ sum of RDD elements + acc initial value * No. of RDD partitions + acc initial value ]

[RDD元素总和+acc初始值*RDD分区数+acc初始值]

for the question here, I would suggest to check the partitions as the number of partitions should be 8 as per my understanding as every time we process the seq op on a partition of RDD it will start with the initial sum of acc result and also when it is going to do the comb Op it will again use the acc initial value once.

对于这里的问题,我建议检查分区,因为根据我的理解,分区数应该是 8,因为每次我们在 RDD 的分区上处理 seq 操作时,它都会从 acc 结果的初始总和开始它将执行组合操作,它将再次使用 acc 初始值一次。

for e.g. List (1,2,3,4) & acc (1,0)

例如 List (1,2,3,4) & acc (1,0)

Get partitions in scala by RDD.partitions.size

通过 RDD.partitions.size 获取 scala 中的分区

if Partitions are 2 & number of elements is 4 then => [ 10 + 1 * 2 + 1 ] => (13,4)

如果分区是 2 & 元素数是 4 那么 => [ 10 + 1 * 2 + 1 ] => (13,4)

if Partition are 4 & number of elements is 4 then => [ 10 + 1 * 4 + 1 ] => (15,4)

如果分区是 4 & 元素数是 4 那么 => [ 10 + 1 * 4 + 1 ] => (15,4)

Hope this helps, you can check herefor explanation. Thanks.

希望这会有所帮助,您可以在此处查看解释。谢谢。

回答by W.Sen

I try many experiments about this question. It is better to set num of partition for aggregate. the seqOp will process each partion and apply the initial value, what' more, combOp will also apply the initial value when combines all partitions. So, I present the format for this question:

我尝试了很多关于这个问题的实验。最好为聚合设置分区数。seqOp 将处理每个分区并应用初始值,此外,当组合所有分区时,combOp 也会应用初始值。所以,我提出这个问题的格式:

final result = sum(list) + num_Of_Partitions * initial_Value + 1

回答by lovasoa

You can use the following code (in scala) to see precisely what aggregateis doing. It builds a tree of all the addition and merge operations:

您可以使用以下代码(在 Scala 中)准确查看aggregate正在执行的操作。它构建了一个包含所有加法和合并操作的树:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

And then, in the shell:

然后,在外壳中:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

So, we have these 3 partitions: [4], [1,2], and [3].

所以,我们有这 3 个分区:[4]、[1,2] 和 [3]。

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

You can represent the result as a tree:

您可以将结果表示为一棵树:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

You can see that a first zero element is created on the driver node (at the left of the tree), and then, the results for all the partitions are merged one by one. You also see that if you replace 0 by 1 as you did in your question, it will add 1 to each result on each partition, and also add 1 to the initial value on the driver. So, the total number of time the zerovalue you give is used is:

您可以看到在驱动程序节点(树的左侧)上创建了第一个零元素,然后将所有分区的结果一一合并。您还会看到,如果您像在问题中一样将 0 替换为 1,它将为每个分区上的每个结果加 1,并在驱动程序的初始值上加 1。因此,您给出的值被使用的总次数是:

number of partitions + 1.

number of partitions + 1.

So, in your case, the result of

所以,在你的情况下,结果

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

will be:

将会:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

The implementation of aggregateis quite simple. It is defined in RDD.scala, line 1107:

的实现aggregate非常简单。它在RDD.scala 第 1107 行中定义:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}

回答by ryh

Thanks to gsamaras.

感谢 gsamaras。

My viewgraph is as below, enter image description here

我的视图如下, enter image description here

回答by Suresh Gudimetla

I will explain the concept of Aggregate operation in Spark as follows:

我将解释Spark中聚合操作的概念如下:

Definition of the aggregate function

聚合函数的定义

**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4)--> 4 represents the number of partitions available in our Spark cluster.

val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4)--> 4 表示 Spark 集群中可用的分区数。

Hence, the rdd is distributed into 4 partitions as:

因此,rdd 被分配到 4 个分区中:

11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16

we divide the problem statement into two parts: The first part of the problem is to aggregate the total number of flowers picked in each quadrant; that's the intra-partition sequence aggregation

我们将问题陈述分为两部分:第一部分是汇总每个象限采摘的花朵总数;这就是分区内序列聚合

11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65

The second part of the problem is to sum these individual aggregates across the partitions; that's the inter-partition aggregation.

问题的第二部分是对跨分区的这些单独的聚合求和;这就是分区间聚合。

36 + 75 + 108 + 65 = 284

The sum, stored in an RDD can further be used and processed for any kind of transformation or other action

存储在 RDD 中的总和可以进一步用于任何类型的转换或其他操作

So the code becomes like:

所以代码变成了这样:

val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y))or val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284

val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y))或者 val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284

Explanation:(0) - is the accumulator The first +is the intra-partition sum, adding the total number of flowers picked by each picker in each quadrant of the garden. The second +is the inter-partition sum, which aggregates the total sums from each quadrant.

解释:(0) - 是累加器 第一个+是分区内和,将花园每个象限中每个采摘者采摘的花朵总数相加。第二个+是分区间总和,它汇总了每个象限的总和。

Case 1:

情况1:

Suppose, if we need to reduce functions after the initial value. What would happen if initial value weren't zero??. If it were 4, for example:

假设,如果我们需要在初始值之后减少函数。如果初始值不为零会发生什么??。如果是 4,例如:

The number would added to each intra-partition aggregate, and also to the inter-partition aggregate:

该数字将添加到每个分区内聚合中,也会添加到分区间聚合中:

So the first calculation would be:

所以第一个计算是:

11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70

Here's the inter-partition aggregation calculation with the initial value of 5:

下面是初始值为5的分区间聚合计算:

partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309


So, coming to your query: The sum can calculated based on the number of partitions the rdd data is distributed. i thought that your data is distributed as below and that's why you have the result as (19, 4). So, when doing aggregate operation be specific with number of partition value:

因此,进入您的查询:可以根据 rdd 数据分布的分区数计算总和。我认为您的数据分布如下,这就是为什么结果为 (19, 4)。因此,在进行聚合操作时,请特定于分区值的数量:

val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

result:

结果:

list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)

Explanation:As your data is distributed in 8 partitions, the result is like (by using the above explained logic)

说明:由于您的数据分布在 8 个分区中,结果就像(通过使用上面解释的逻辑)

intra-partition addition:

分区内添加:

0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5

total=18

inter-partition calculation:

分区间计算:

18+1 (1+2+1+3+1+4+1+5+1) = 19

Thank you

谢谢