Python 如何拆除 SparkSession 并在一个应用程序中创建一个新的?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41491972/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I tear down a SparkSession and create a new one within one application?
提问by vaer-k
I have a pyspark program with multiple independent modules that can each independently process data to meet my various needs. But they can also be chained together to process data in a pipeline. Each of these modules builds a SparkSession and executes perfectly on their own.
我有一个带有多个独立模块的 pyspark 程序,每个模块都可以独立处理数据以满足我的各种需求。但它们也可以链接在一起以在管道中处理数据。这些模块中的每一个都构建了一个 SparkSession 并自行完美执行。
However, when I try to run them serially within the same python process, I run into issues. At the moment when the second module in the pipeline executes, spark complains that the SparkContext I am attempting to use has been stopped:
但是,当我尝试在同一个 python 进程中连续运行它们时,我遇到了问题。在管道中的第二个模块执行时,spark 抱怨我尝试使用的 SparkContext 已停止:
py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
Each of these modules builds a SparkSession at the beginning of execution and stops the sparkContext at the end of its process. I build and stop sessions/contexts like so:
这些模块中的每一个都在执行开始时构建一个 SparkSession,并在其进程结束时停止 sparkContext。我像这样构建和停止会话/上下文:
session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()
According to official documentation, getOrCreate
"gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder." But I don't want this behavior (this behavior where the process attempts to get an existing session). I can't find any way to disable it, and I can't figure out how to destroy the session -- I only know how to stop its associated SparkContext.
根据官方文档,getOrCreate
“获取现有 SparkSession,或者,如果没有现有 SparkSession,则根据此构建器中设置的选项创建一个新的。” 但我不想要这种行为(这种行为是进程试图获取现有会话的行为)。我找不到任何禁用它的方法,也无法弄清楚如何销毁会话——我只知道如何停止其关联的 SparkContext。
How can I build new SparkSessions in independent modules, and execute them in sequence in the same Python process without previous sessions interfering with the newly created ones?
如何在独立模块中构建新的 SparkSession,并在同一个 Python 进程中按顺序执行它们,而不会干扰新创建的会话?
The following is an example of the project structure:
以下是项目结构的示例:
main.py
主文件
import collect
import process
if __name__ == '__main__':
data = collect.execute()
process.execute(data)
collect.py
收集.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
process.py
进程.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
采纳答案by zero323
Long story short, Spark (including PySpark) is not designed to handle multiple contexts in a single application. If you're interested in JVM side of the story I would recommend reading SPARK-2243(resolved as won't fix).
长话短说,Spark(包括 PySpark)并不是为了在单个应用程序中处理多个上下文而设计的。如果您对故事的 JVM 方面感兴趣,我建议您阅读SPARK-2243(已解决为无法修复)。
There is a number of design decisions made in PySpark which reflects that including, but not limited to a singleton Py4J gateway. Effectively you cannot have multiple SparkContexts
in a single application. SparkSession
is not only bound to SparkContext
but also introduces problems of its own, like handling local (standalone) Hive metastore if one is used. Moreover there functions which use SparkSession.builder.getOrCreate
internally and depend on the behavior you see right now. A notable example is UDF registration. Other functions may exhibit unexpected behavior if multiple SQL contexts are present (for example RDD.toDF
).
PySpark 中有许多设计决策反映了这一点,包括但不限于单例 Py4J 网关。实际上,您不能SparkContexts
在单个应用程序中拥有多个。SparkSession
不仅绑定SparkContext
而且还会引入其自身的问题,例如处理本地(独立)Hive Metastore(如果使用)。此外,SparkSession.builder.getOrCreate
内部使用的函数取决于您现在看到的行为。一个显着的例子是 UDF 注册。如果存在多个 SQL 上下文(例如RDD.toDF
),其他函数可能会表现出意外行为。
Multiple contexts are not only unsupported but also, in my personal opinion, violate single responsibility principle. Your business logic shouldn't be concerned with all the setup, cleanup and configuration details.
多个上下文不仅不受支持,而且在我个人看来,也违反了单一职责原则。您的业务逻辑不应涉及所有设置、清理和配置细节。
My personal recommendations are as follows:
我个人的建议如下:
If application consist of multiple coherent modules which can be composed and benefit from a single execution environment with caching and common metastore initialize all required contexts in the application entry point and pass these down to individual pipelines when necessary:
main.py
:from pyspark.sql import SparkSession import collect import process if __name__ == "__main__": spark: SparkSession = ... # Pass data between modules collected = collect.execute(spark) processed = process.execute(spark, data=collected) ... spark.stop()
collect.py
/process.py
:from pyspark.sql import SparkSession def execute(spark: SparkSession, data=None): ...
Otherwise (it seems to be the case here based on your description) I would design entrypoint to execute a single pipeline and use external worfklow manager (like Apache Airflowor Toil) to handle the execution.
It is not only cleaner but also allows for much more flexible fault recovery and scheduling.
The same thing can be of course done with builders but like a smart persononce said: Explicit is better than implicit.
main.py
import argparse from pyspark.sql import SparkSession import collect import process pipelines = {"collect": collect, "process": process} if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--pipeline') args = parser.parse_args() spark: SparkSession = ... # Execute a single pipeline only for side effects pipelines[args.pipeline].execute(spark) spark.stop()
collect.py
/process.py
as in the previous point.
如果应用程序由多个相关模块组成,这些模块可以组合在一起并受益于具有缓存和公共元存储的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递给各个管道:
main.py
:from pyspark.sql import SparkSession import collect import process if __name__ == "__main__": spark: SparkSession = ... # Pass data between modules collected = collect.execute(spark) processed = process.execute(spark, data=collected) ... spark.stop()
collect.py
/process.py
:from pyspark.sql import SparkSession def execute(spark: SparkSession, data=None): ...
否则(根据您的描述,这里似乎是这种情况)我会设计入口点来执行单个管道并使用外部工作流管理器(如Apache Airflow或Toil)来处理执行。
它不仅更干净,而且还允许更灵活的故障恢复和调度。
同样的事情当然可以用建设者来做,但就像一个聪明人曾经说过的那样:显式比隐式更好。
main.py
import argparse from pyspark.sql import SparkSession import collect import process pipelines = {"collect": collect, "process": process} if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--pipeline') args = parser.parse_args() spark: SparkSession = ... # Execute a single pipeline only for side effects pipelines[args.pipeline].execute(spark) spark.stop()
collect.py
/process.py
和上一点一样。
One way or another I would keep one and only one place where context is set up and one and only one place were it is tear down.
以一种或另一种方式,我会保留一个并且只有一个设置上下文的地方,并且只有一个地方被拆除。
回答by vaer-k
Here is a workaround, but not a solution:
这是一种解决方法,但不是解决方案:
I discovered that the SparkSession
class in the source codecontains the following __init__
(I've removed irrelevant lines of code from display here):
我发现源代码中的SparkSession
类包含以下内容(我已从此处的显示中删除了不相关的代码行):__init__
_instantiatedContext = None
def __init__(self, sparkContext, jsparkSession=None):
self._sc = sparkContext
if SparkSession._instantiatedContext is None:
SparkSession._instantiatedContext = self
Therefore, I can workaround my problem by setting the _instantiatedContext
attribute of the session to None
after calling session.stop()
. When the next module executes, it calls getOrCreate()
and does not find the previous _instantiatedContext
, so it assigns a new sparkContext
.
因此,我可以通过设置解决方法我的问题_instantiatedContext
在会议属性None
调用后session.stop()
。当下一个模块执行时,它调用getOrCreate()
并没有找到前一个_instantiatedContext
,因此它分配一个新的sparkContext
.
This isn't a very satisfying solution but it serves as a workaround to meet my current needs. I'm unsure of whether or not this entire approach of starting independent sessions is anti-pattern or just unusual.
这不是一个非常令人满意的解决方案,但它可以作为满足我当前需求的解决方法。我不确定启动独立会话的整个方法是反模式还是不寻常。
回答by ThatDataGuy
Why would you not pass in the same spark session instance into the multiple stages of your pipeline? You could use a builder pattern. Sounds to me like you are collecting result sets at the end each stage and then passing that data into the next stage. Consider leaving the data in the cluster in the same session, and passing the session reference and result reference from stage to stage, until your application is complete.
为什么不将同一个 spark 会话实例传递到管道的多个阶段?您可以使用构建器模式。在我看来,您在每个阶段的最后收集结果集,然后将该数据传递到下一个阶段。考虑将集群中的数据保留在同一个会话中,并将会话引用和结果引用从一个阶段传递到另一个阶段,直到您的应用程序完成。
In other words, put the
换句话说,把
session = SparkSession.builder...
...in your main.
...在你的主要。
回答by Veera Marni
spark_current_session = SparkSession. \
builder. \
appName('APP'). \
config(conf=SparkConf()). \
getOrCreate()
spark_current_session.newSession()
you can create a new session from current session
您可以从当前会话创建一个新会话