Python 取消保留 (py)spark 中的所有数据帧

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36905717/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 18:31:56  来源:igfitidea点击:

Un-persisting all dataframes in (py)spark

pythoncachingapache-sparkpysparkapache-spark-sql

提问by bHyman3

I am a spark application with several points where I would like to persist the current state. This is usually after a large step, or caching a state that I would like to use multiple times. It appears that when I call cache on my dataframe a second time, a new copy is cached to memory. In my application, this leads to memory issues when scaling up. Even though, a given dataframe is a maximum of about 100 MB in my current tests, the cumulative size of the intermediate results grows beyond the alloted memory on the executor. See below for a small example that shows this behavior.

我是一个 Spark 应用程序,有几个点我想保留当前状态。这通常是在一个很大的步骤之后,或者缓存一个我想多次使用的状态。似乎当我第二次在我的数据帧上调用缓存时,一个新副本被缓存到内存中。在我的应用程序中,这会导致扩展时出现内存问题。尽管在我当前的测试中给定的数据帧最大约为 100 MB,但中间结果的累积大小超出了执行程序上分配的内存。有关显示此行为的小示例,请参见下文。

cache_test.py:

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

simple_data.csv:

simple_data.csv:

1,2,3
4,5,6
7,8,9

Looking at the application UI, there is a copy of the original dataframe, in adition to the one with the new column. I can remove the original copy by calling df.unpersist()before the withColumn line. Is this the recommended way to remove cached intermediate result (i.e. call unpersist before every cache()).

查看应用程序 UI,除了带有新列的数据帧之外,还有原始数据帧的副本。我可以通过df.unpersist()在 withColumn 行之前调用来删除原始副本。这是删除缓存的中间结果的推荐方法(即在 each 之前调用 unpersist cache())。

Also, is it possible to purge all cached objects. In my application, there are natural breakpoints where I can simply purge all memory, and move on to the next file. I would like to do this without creating a new spark application for each input file.

此外,是否可以清除所有缓存的对象。在我的应用程序中,有自然断点,我可以简单地清除所有内存,然后继续下一个文件。我想这样做而不为每个输入文件创建一个新的 spark 应用程序。

Thank you in advance!

先感谢您!

回答by zero323

Spark 2.x

火花2.x

You can use Catalog.clearCache:

您可以使用Catalog.clearCache

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

火花1.x

You can use SQLContext.clearCachemethod which

您可以使用SQLContext.clearCache方法

Removes all cached tables from the in-memory cache.

从内存缓存中删除所有缓存的表。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()

回答by Tagar

We use this quite often

我们经常使用这个

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))

where scis a sparkContext variable.

哪里sc是 sparkContext 变量。