scala 在 Spark 的 RDD 中更新值的有效方法是什么?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24132271/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:19:36  来源:igfitidea点击:

What is the efficient way to update value inside Spark's RDD?

scalaapache-spark

提问by bxshi

I'm writing a graph-related program in Scalawith Spark. The dataset have 4 million nodes and 4 million edges(you can treat this as a tree), but for each time(an Iteration), I only edit a portion of it, namely a sub-tree rooted by a given node, and the nodes in a path between that given node and root.

我正在ScalaSpark. 该数据集有 400 万个节点和 400 万条边(您可以将其视为一棵树),但是对于每次(an Iteration),我只编辑其中的一部分,即以给定节点为根的子树,以及节点在给定节点和根之间的路径中。

The Iterationhas dependency, which means i+1Iterationneeds the result coming from i. So I need store the result of each Iterationfor next step.

Iteration具有依赖性,该装置i+1Iteration所需要的结果来自何处i。所以我需要Iteration为下一步存储每个的结果。

I'm trying to find an efficient way to update RDD, but have no clue so far.I find that PairRDDhave a lookupfunction which could reduce the computation time from O(N), to O(M), Ndenote the total number of objects in RDDand Mdenote the number of elements in each partition.

我试图找到一种有效的更新方式RDD,但到目前为止一无所知。我发现PairRDD有一个lookup函数可以将计算时间从O(N),减少到 O( M),N表示对象的总数RDDM表示每个分区中的元素。

So I'm thinking is there anyway that I could update an object in the RDDwith O(M)? Or more ideally, O(1)?(I see an email in Spark's mail list saying that the lookupcan be modified to achieve O(1))

所以我在想无论如何我可以更新RDDwith 中的对象O(M)?或者更理想的是,O(1)?(我在 Spark 的邮件列表中看到一封电子邮件,说lookup可以修改以实现 O(1))

Another thing is, if I could achieve O(M)for updating the RDD, could I increase the partition to some number larger than the number of cores I have and achieve a better performance?

另一件事是,如果我可以实现O(M)更新RDD,我是否可以将分区增加到比我拥有的内核数更大的数量并获得更好的性能?

采纳答案by cloud

An RDD is a distributed data set, a partition is the unit for RDD storage, and the unit to process and RDD is an element.

一个RDD是一个分布式数据集,一个分区是RDD存储的单位,处理的单位是RDD,RDD是一个元素。

For example, you read a large file from HDFS as an RDD, then the element of this RDD is String(lines in that file), and spark stores this RDD across the cluster by partition. For you, as a spark user, you only need to care about how to deal with the lines of that files, just like you are writing a normal program, and you read a file from local file system line by line. That's the power of spark:)

例如,你从 HDFS 读取一个大文件作为 RDD,那么这个 RDD 的元素是String(该文件中的行),spark 将这个 RDD 按分区存储在整个集群中。对你来说,作为一个spark用户,你只需要关心如何处理那些文件的行,就像你在写一个普通的程序一样,你从本地文件系统中逐行读取一个文件。这就是火花的力量:)

Anyway, you have no idea which elements will be stored in a certain partition, so it doesn't make sense to update a certain partition.

无论如何,您不知道将在某个分区中存储哪些元素,因此更新某个分区是没有意义的。

回答by maasg

As functional data structures, RDDs are immutable and an operation on an RDD generates a new RDD.

作为函数式数据结构,RDD 是不可变的,对 RDD 的操作会生成一个新的 RDD。

Immutability of the structure does not necessarily mean full replication. Persistant data structures are a common functional pattern where operations on immutable structures yield a new structure but previous versions are maintained and often reused.

结构的不变性并不一定意味着完全复制。持久数据结构是一种常见的功能模式,其中对不可变结构的操作会产生一个新结构,但以前的版本会得到维护并经常重用。

GraphX (a 'module' on top of Spark) is a graph API on top of Spark that uses such concept: From the docs:

GraphX(Spark 之上的“模块”)是 Spark 之上的一个图形 API,它使用这样的概念:来自文档:

Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies) are reused in the new graph reducing the cost of this inherently functional data-structure.

对图形的值或结构的更改是通过生成具有所需更改的新图形来完成的。请注意,原始图的重要部分(即未受影响的结构、属性和索引)在新图中重用,从而降低了这种固有功能数据结构的成本。

It might be a solution for the problem at hand: http://spark.apache.org/docs/1.0.0/graphx-programming-guide.html

它可能是手头问题的解决方案:http: //spark.apache.org/docs/1.0.0/graphx-programming-guide.html

回答by samthebest

The MapReduce programming model (and FP) doesn't really support updates of single values. Rather one is supposed to define a sequence of transformations.

MapReduce 编程模型(和 FP)并不真正支持单个值的更新。而是应该定义一系列转换。

Now when you have interdependent values, i.e. you cannot perform your transformation with a simple mapbut need to aggregate multiple values and update based on that value, then what you need to do is think of a way of grouping those values together then transforming each group - or define a monoidal operation so that the operation can be distributed and chopped up into substeps.

现在,当您具有相互依赖的值时,即您无法使用简单map但需要聚合多个值并根据该值进行更新来执行转换,那么您需要做的是想出一种将这些值组合在一起然后转换每个组的方法 -或者定义一个幺半群运算,以便可以将运算分布并分成子步。

Group By Approach

按方法分组

Now I'll try to be a little more specific for your particular case. You say you have subtrees, is it possible to first map each node to an key that indicates the corresponding subtree? If so you could do something like this:

现在,我将针对您的特定情况尝试更具体一些。您说您有子树,是否可以先将每个节点映射到指示相应子树的键?如果是这样,您可以执行以下操作:

nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ...

nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ...

Monoid

幺半群

(strictly speaking you want a commutative monoid) Best you read http://en.wikipedia.org/wiki/Monoid#Commutative_monoid

(严格来说,你想要一个可交换的幺半群)最好阅读http://en.wikipedia.org/wiki/Monoid#Commutative_monoid

For example +is a monoidal operation because when one wishes to compute the sum of, say, an RDD of Ints then the underlying framework can chop up the data into chunks, perform the sum on each chunk, then sum up the resulting sums (possibly in more than just 2 steps too). If you can find a monoid that will ultimately produce the same results you require from single updates, then you have a way to distribute your processing. E.g.

例如+是一个幺半群运算,因为当人们希望计算一个 RDD 的总和时,底层框架可以将数据分成块,对每个块执行求和,然后对结果求和(可能在也不仅仅是 2 个步骤)。如果您能找到一个幺半群,它最终会从单个更新中产生您需要的相同结果,那么您就有了一种分配处理的方法。例如

nodes.reduce(_ myMonoid _)

nodes.reduce(_ myMonoid _)