scala 火花中的RDD聚合

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37018249/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:15:55  来源:igfitidea点击:

RDD Aggregate in spark

scalaapache-sparkrdd

提问by Lijju Mathew

I am an Apache Spark learner and have come across a RDDaction aggregatewhich I have no clue of how it functions. Can some one spell out and explain in detail step by step how did we arrive at the below result for the code here

我是一个Apache星火学习者和所遇到的一个RDD动作aggregate,我有没有它的功能如何线索。有人可以一步一步地拼写并详细解释我们是如何得出以下代码的结果的吗?

RDD input = {1,2,3,3}

RDD Aggregate function :

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

output : {9,4}

Thanks

谢谢

回答by zero323

If you are not sure what is going on it is best to follow the types. Omitting implicit ClassTagfor brevity we start with something like this

如果您不确定发生了什么,最好遵循这些类型。ClassTag为简洁起见省略隐式,我们从这样的事情开始

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U): U 

If you ignore all the additional parameters you'll see that aggregateis a function which maps from RDD[T]to U. It means that the type of the values in the input RDDdoesn't have to be the same as the type of the output value. So it is clearly different than for example reduce:

如果您忽略所有附加参数,您将看到这aggregate是一个从RDD[T]to映射的函数U。这意味着输入RDD中值的类型不必与输出值的类型相同。所以它明显不同于例如reduce

def reduce(func: (T, T) ? T): T 

or fold:

fold

def fold(zeroValue: T)(op: (T, T) => T): T

The same as fold, aggregaterequires a zeroValue. How to choose it? It should be an identity (neutral) element with respect to combOp.

与 相同foldaggregate需要一个zeroValue. 如何选择它?它应该是关于 的标识(中性)元素combOp

You also have to provide two functions:

您还必须提供两个功能:

  • seqOpwhich maps from (U, T)to U
  • combOpwhich maps from (U, U)to U
  • seqOp映射从(U, T)U
  • combOp映射从(U, U)U

Just based on this signatures you should already see that only seqOpmay access the raw data. It takes some value of type Uanother one of type Tand returns a value of type U. In your case it is a function with a following signature

仅基于此签名,您应该已经看到只能seqOp访问原始数据。它采用某种类型的值,U另一种类型T的值,并返回一个类型的值U。在您的情况下,它是一个具有以下签名的函数

((Int, Int), Int) => (Int, Int) 

At this point you probably suspect it is used for some kind of fold-like operation.

在这一点上,您可能怀疑它用于某种类似折叠的操作。

The second function takes two arguments of type Uand returns a value of type U. As stated before it should be clear that it doesn't touch the original data and can operate only on the values already processed by the seqOp. In your case this function has a signature as follows:

第二个函数接受两个 type 参数U并返回一个 type 值U。如前所述,应该清楚它不会触及原始数据,并且只能对seqOp. 在您的情况下,此函数具有如下签名:

((Int, Int), (Int, Int)) => (Int, Int) 

So how can we get all of that together?

那么我们如何才能将所有这些结合在一起呢?

  1. First each partition is aggregated using standard Iterator.aggregatewith zeroValue, seqOpand combOppassed as z, seqopand comboprespectivelly. Since InterruptibleIteratorused internally doesn't override aggregateit should be executed as a simple foldLeft(zeroValue)(seqOp)

  2. Next partial results collected from each partition are aggregated using combOp

  1. 首先每个分区使用标准聚合Iterator.aggregatezeroValueseqOpcombOp作为通过了zseqopcomboprespectivelly。由于InterruptibleIterator内部使用不会覆盖aggregate它应该作为一个简单的执行foldLeft(zeroValue)(seqOp)

  2. 从每个分区收集的下一部分结果使用聚合 combOp

Lets assume that input RDD has three partitions with following distribution of values:

让我们假设输入 RDD 具有三个分区,其值分布如下:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()
  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

You can expect that execution, ignoring absolute order, will be equivalent to something like this:

您可以期望忽略绝对顺序的执行将等效于以下内容:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)

Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
  .map(_.foldLeft((0, 0))(seqOp))
  .reduce(combOp)

foldLeftfor a single partition can look like this:

foldLeft对于单个分区可以如下所示:

Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)

and over all partitions

以及所有分区

Seq((3,2), (6,2), (0,0))

which combined will give you observed result:

结合起来会给你观察到的结果:

(3 + 6 + 0, 2 + 2 + 0)
(9, 4)

In general this is a common pattern you will find all over Spark where you pass neutral value, a function used to process values per partition and a function used to merge partial aggregates from different partitions. Some other examples include:

一般来说,这是一个常见的模式,你会在整个 Spark 中找到你传递中性值,一个用于处理每个分区的值的函数和一个用于合并来自不同分区的部分聚合的函数。其他一些例子包括:

  • aggregateByKey
  • User Defined Aggregate Functions
  • Aggregatorson Spark Datasets.
  • aggregateByKey
  • 用户定义的聚合函数
  • Aggregators在 Spark 上Datasets

回答by Yuanxu Xu

Here is my understanding for your reference:

以下是我的理解,供大家参考:

Imagine you have two nodes, one take the input of the first two list elements {1,2}, and another takes {3, 3}. (The partition here is only for convenient)

假设您有两个节点,一个接受前两个列表元素 {1,2} 的输入,另一个接受 {3, 3}。(这里的分区只是为了方便)

At the first node: "(x, y) => (x._1 + y, x._2 + 1)" , the first x is (0,0) as given, and y is your first element 1, and you will have output (0+1, 0+1), then comes your second element y=2, and output (1 + 2, 1 + 1), which is (3, 2)

在第一个节点: " (x, y) => (x._1 + y, x._2 + 1)" ,第一个 x 是给定的 (0,0),y 是你的第一个元素 1,你将有输出 (0+1, 0+1),然后是您的第二个元素 y=2,并输出 (1 + 2, 1 + 1),即 (3, 2)

At the second node, same procedure happens in parallel, and you'll have (6, 2).

在第二个节点,相同的过程并行发生,您将获得 (6, 2)。

"(x, y) => (x._1 + y._1, x._2 + y._2)", tells you to merge two nodes, and you'll get (9,4)

" (x, y) => (x._1 + y._1, x._2 + y._2)",告诉你合并两个节点,你会得到 (9,4)



one thing worth noticing is (0,0) is actually added to the result length(rdd)+1 times.

值得注意的一件事是 (0,0) 实际上添加到结果 length(rdd)+1 次。

"scala> rdd.aggregate((1,1)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9)"

" scala> rdd.aggregate((1,1)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1 , x._2 + y._2)) res1: (Int, Int) = (14,9)"