scala (为什么)我们需要在 RDD 上调用缓存还是持久化

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28981359/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:57:20  来源:igfitidea点击:

(Why) do we need to call cache or persist on a RDD

scalaapache-sparkrdd

提问by Ramana

When a resilient distributed dataset (RDD) is created from a text file or collection (or from another RDD), do we need to call "cache" or "persist" explicitly to store the RDD data into memory? Or is the RDD data stored in a distributed way in the memory by default?

当从文本文件或集合(或从另一个 RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用“缓存”或“持久化”来将 RDD 数据存储到内存中?还是RDD数据默认分布式存储在内存中?

val textFile = sc.textFile("/user/emp.txt")

As per my understanding, after the above step, textFile is a RDD and is available in all/some of the node's memory.

根据我的理解,经过上述步骤后,t​​extFile 是一个 RDD,并且在所有/部分节点的内存中可用。

If so, why do we need to call "cache" or "persist" on textFile RDD then?

如果是这样,为什么我们需要在 textFile RDD 上调用“缓存”或“持久化”呢?

回答by Daniel Darabos

Most RDD operations are lazy. Think of an RDD as a description of a series of operations. An RDD is not data. So this line:

大多数 RDD 操作都是惰性的。将 RDD 视为对一系列操作的描述。RDD 不是数据。所以这一行:

val textFile = sc.textFile("/user/emp.txt")

It does nothing. It creates an RDD that says "we will need to load this file". The file is not loaded at this point.

它什么都不做。它创建了一个 RDD,上面写着“我们需要加载这个文件”。此时未加载该文件。

RDD operations that require observing the contents of the data cannot be lazy. (These are called actions.) An example is RDD.count— to tell you the number of lines in the file, the file needs to be read. So if you write textFile.count, at this point the file will be read, the lines will be counted, and the count will be returned.

需要观察数据内容的RDD操作不能懒惰。(这些被称为操作。)一个例子是RDD.count——告诉你文件中的行数,文件需要被读取。所以如果你写textFile.count,此时会读取文件,计算行数,并返回计数。

What if you call textFile.countagain? The same thing: the file will be read and counted again. Nothing is stored. An RDD is not data.

再打textFile.count过来怎么办?同样的事情:文件将被再次读取和计数。什么都不存储。RDD 不是数据。

So what does RDD.cachedo? If you add textFile.cacheto the above code:

那么有什么作用RDD.cache呢?如果textFile.cache在上面的代码中添加:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

It does nothing. RDD.cacheis also a lazy operation. The file is still not read. But now the RDD says "read this file and then cache the contents". If you then run textFile.countthe first time, the file will be loaded, cached, and counted. If you call textFile.counta second time, the operation will use the cache. It will just take the data from the cache and count the lines.

它什么都不做。RDD.cache也是懒惰的操作。该文件仍未读取。但是现在 RDD 说“读取这个文件然后缓存内容”。如果您textFile.count第一次运行,文件将被加载、缓存和计数。如果您textFile.count再次调用,该操作将使用缓存。它只会从缓存中获取数据并计算行数。

The cache behavior depends on the available memory. If the file does not fit in the memory, for example, then textFile.countwill fall back to the usual behavior and re-read the file.

缓存行为取决于可用内存。例如,如果文件不适合内存,textFile.count则将退回到通常的行为并重新读取文件。

回答by maasg

I think the question would be better formulated as:

我认为这个问题会更好地表述为:

When do we need to call cache or persist on a RDD?

我们什么时候需要在 RDD 上调用缓存或持久化?

Spark processes are lazy, that is, nothing will happen until it's required. To quick answer the question, after val textFile = sc.textFile("/user/emp.txt")is issued, nothing happens to the data, only a HadoopRDDis constructed, using the file as source.

Spark 进程是惰性的,也就是说,在需要它之前什么都不会发生。为了快速回答这个问题,val textFile = sc.textFile("/user/emp.txt")发出后,数据没有任何反应,只HadoopRDD构造了一个,以文件为源。

Let's say we transform that data a bit:

假设我们对数据进行了一些转换:

val wordsRDD = textFile.flatMap(line => line.split("\W"))

Again, nothing happens to the data. Now there's a new RDD wordsRDDthat contains a reference to testFileand a function to be applied when needed.

同样,数据没有任何变化。现在有一个新的 RDD wordsRDD,它包含一个引用testFile和一个在需要时应用的函数。

Only when an action is called upon an RDD, like wordsRDD.count, the RDD chain, called lineagewill be executed. That is, the data, broken down in partitions, will be loaded by the Spark cluster's executors, the flatMapfunction will be applied and the result will be calculated.

仅当对 RDD 调用操作时,例如wordsRDD.count,称为沿袭的 RDD 链才会被执行。也就是说,按分区分解的数据将由 Spark 集群的执行程序加载,flatMap将应用该函数并计算结果。

On a linear lineage, like the one in this example, cache()is not needed. The data will be loaded to the executors, all the transformations will be applied and finally the countwill be computed, all in memory - if the data fits in memory.

在线性谱系上,如本例中的谱系,cache()不需要。数据将被加载到执行器,所有的转换将被应用,最后count将被计算,全部在内存中 - 如果数据适合内存。

cacheis useful when the lineage of the RDD branches out. Let's say you want to filter the words of the previous example into a count for positive and negative words. You could do this like that:

cache当 RDD 的谱系分支出来时很有用。假设您想将上一个示例中的单词过滤为正词和负词的计数。你可以这样做:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Here, each branch issues a reload of the data. Adding an explicit cachestatement will ensure that processing done previously is preserved and reused. The job will look like this:

在这里,每个分支都会重新加载数据。添加显式cache语句将确保保留和重用先前完成的处理。作业将如下所示:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

For that reason, cacheis said to 'break the lineage' as it creates a checkpoint that can be reused for further processing.

出于这个原因,cache据说“打破血统”,因为它创建了一个检查点,可以重复使用以进行进一步处理。

Rule of thumb: Use cachewhen the lineage of your RDD branches outor when an RDD is used multiple times like in a loop.

经验法则:cache当你的 RDD 的血统分支出来或者当一个 RDD 像在循环中被多次使用时使用。

回答by eliasah

Do we need to call "cache" or "persist" explicitly to store the RDD data into memory?

我们是否需要显式调用“缓存”或“持久化”来将 RDD 数据存储到内存中?

Yes, only if needed.

是的,仅在需要时。

The RDD data stored in a distributed way in the memory by default?

RDD数据默认分布式存储在内存中?

No!

不!

And these are the reasons why :

这些是原因:

  • Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

  • RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

  • All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

  • By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.There is also support for persisting RDDs on disk, or replicated across multiple nodes.

  • Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和。

  • RDD 支持两种类型的操作:转换,它从现有的数据集创建一个新的数据集,以及操作,在对数据集运行计算后将一个值返回给驱动程序。例如,map 是一种转换,它通过一个函数传递每个数据集元素并返回一个表示结果的新 RDD。另一方面,reduce 是一个动作,它使用某个函数聚合 RDD 的所有元素并将最终结果返回给驱动程序(尽管还有一个并行的 reduceByKey 返回分布式数据集)。

  • Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记住应用于某些基本数据集(例如文件)的转换。仅当操作需要将结果返回到驱动程序时才计算转换。这样的设计让 Spark 运行起来更高效——例如,我们可以意识到通过 map 创建的数据集会在 reduce 中使用,并且只将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。

  • 默认情况下,每次在其上运行操​​作时,每个转换后的 RDD 可能会重新计算。但是,您也可以使用持久(或缓存)方法将 RDD 持久化在内存中,在这种情况下,Spark 会将元素保留在集群上,以便在您下次查询时更快地访问它。还支持在磁盘上持久化 RDD,或跨多个节点复制。

For more details please check the Spark programming guide.

有关更多详细信息,请查看Spark 编程指南

回答by rileyss

Below are the three situations you should cache your RDDs:

以下是您应该缓存 RDD 的三种情况:

using an RDD many times

performing multiple actions on the same RDD

for long chains of (or very expensive) transformations

多次使用 RDD

在同一个 RDD 上执行多个操作

对于长链(或非常昂贵的)转换

回答by zinking

Adding another reason to add (or temporarily add) cachemethod call.

添加另一个添加(或临时添加)cache方法调用的原因。

for debug memory issues

用于调试内存问题

with cachemethod, spark will give debugging informations regarding the size of the RDD. so in the spark integrated UI, you will get RDD memory consumption info. and this proved very helpful diagnosing memory issues.

使用cache方法,spark 将提供有关 RDD 大小的调试信息。所以在 spark 集成 UI 中,您将获得 RDD 内存消耗信息。事实证明,这对诊断内存问题非常有帮助。