Python 如何使用 pyspark 在 Spark 2.0 中构建 sparkSession?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39780792/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 22:42:20  来源:igfitidea点击:

How to build a sparkSession in Spark 2.0 using pyspark?

pythonsqlapache-sparkpyspark

提问by haileyeve

I just got access to spark 2.0; I have been using spark 1.6.1 up until this point. Can someone please help me set up a sparkSession using pyspark (python)? I know that the scala examples available online are similar (here), but I was hoping for a direct walkthrough in python language.

我刚刚获得了 spark 2.0;到目前为止,我一直在使用 spark 1.6.1。有人可以帮我使用 pyspark (python) 设置 sparkSession 吗?我知道在线提供的 scala 示例是相似的(这里),但我希望直接使用 python 语言进行演练。

My specific case: I am loading in avro files from S3 in a zeppelin spark notebook. Then building df's and running various pyspark & sql queries off of them. All of my old queries use sqlContext. I know this is poor practice, but I started my notebook with

我的具体情况:我正在 zeppelin spark notebook 中从 S3 加载 avro 文件。然后构建 df 并运行各种 pyspark 和 sql 查询。我所有的旧查询都使用 sqlContext。我知道这是不好的做法,但我开始我的笔记本

sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().

sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().

I can read in the avros with

我可以在 avros 中阅读

mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...

mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...

and build dataframes with no issues. But once I start querying the dataframes/temp tables, I keep getting the "java.lang.NullPointerException" error. I think that is indicative of a translational error (e.g. old queries worked in 1.6.1 but need to be tweaked for 2.0). The error occurs regardless of query type. So I am assuming

并毫无问题地构建数据框。但是一旦我开始查询数据帧/临时表,我就会不断收到“java.lang.NullPointerException”错误。我认为这表明存在翻译错误(例如,旧查询在 1.6.1 中有效,但需要针对 2.0 进行调整)。无论查询类型如何,都会发生错误。所以我假设

1.) the sqlContext alias is a bad idea

1.) sqlContext 别名是个坏主意

and

2.) I need to properly set up a sparkSession.

2.) 我需要正确设置一个 sparkSession。

So if someone could show me how this is done, or perhaps explain the discrepancies they know of between the different versions of spark, I would greatly appreciate it. Please let me know if I need to elaborate on this question. I apologize if it is convoluted.

因此,如果有人可以向我展示这是如何完成的,或者解释他们所知道的不同版本的 spark 之间的差异,我将不胜感激。如果我需要详细说明这个问题,请告诉我。如果它令人费解,我深表歉意。

回答by Csaxena

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

now to import some .csv file you can use

现在导入一些您可以使用的 .csv 文件

df=spark.read.csv('filename.csv',header=True)

回答by Ayan Guha

As you can see in the scala example, Spark Session is part of sql module. Similar in python. hence, see pyspark sql module documentation

正如您在 scala 示例中看到的,Spark Session 是 sql 模块的一部分。在python中类似。因此,请参阅pyspark sql 模块文档

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None) The entry point to programming Spark with the Dataset and DataFrame API. A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern:

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None) 使用 Dataset 和 DataFrame API 对 Spark 进行编程的入口点。SparkSession 可用于创建 DataFrame、将 DataFrame 注册为表、在表上执行 SQL、缓存表和读取 parquet 文件。要创建 SparkSession,请使用以下构建器模式:

>>> spark = SparkSession.builder \
...     .master("local") \
...     .appName("Word Count") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate()

回答by Ahmedn1

From here http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html
You can create a spark session using this:

从这里http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html
你可以使用这个创建一个火花会话:

>>> from pyspark.sql import SparkSession
>>> from pyspark.conf import SparkConf
>>> c = SparkConf()
>>> SparkSession.builder.config(conf=c)

回答by Aaka sh

spark  = SparkSession.builder\
                  .master("local")\
                  .enableHiveSupport()\
                  .getOrCreate()

spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext

回答by prossblad

Here's a useful Python SparkSession class I developed:

这是我开发的一个有用的 Python SparkSession 类:

#!/bin/python
# -*- coding: utf-8 -*-

######################
# SparkSession class #
######################
class SparkSession:

    # - Notes:
    # The main object if Spark Context ('sc' object).
    # All new Spark sessions ('spark' objects) are sharing the same underlying Spark context ('sc' object) into the same JVM,
    # but for each Spark context the temporary tables and registered functions are isolated.
    # You can't create a new Spark Context into another JVM by using 'sc = SparkContext(conf)',
    # but it's possible to create several Spark Contexts into the same JVM by specifying 'spark.driver.allowMultipleContexts' to true (not recommended).
    # - See:
    # https://medium.com/@achilleus/spark-session-10d0d66d1d24
    # https://stackoverflow.com/questions/47723761/how-many-sparksessions-can-a-single-application-have
    # https://stackoverflow.com/questions/34879414/multiple-sparkcontext-detected-in-the-same-jvm
    # https://stackoverflow.com/questions/39780792/how-to-build-a-sparksession-in-spark-2-0-using-pyspark
    # https://stackoverflow.com/questions/47813646/sparkcontext-getorcreate-purpose?noredirect=1&lq=1

    from pyspark.sql import SparkSession

    spark = None   # The Spark Session
    sc = None      # The Spark Context
    scConf = None  # The Spark Context conf

    def _init(self):
        self.sc = self.spark.sparkContext
        self.scConf = self.sc.getConf() # or self.scConf = self.spark.sparkContext._conf

    # Return the current Spark Session (singleton), otherwise create a new oneò
    def getOrCreateSparkSession(self, master=None, appName=None, config=None, enableHiveSupport=False):
        cmd = "self.SparkSession.builder"
        if (master != None): cmd += ".master(" + master + ")"
        if (appName != None): cmd += ".appName(" + appName + ")"
        if (config != None): cmd += ".config(" + config + ")"
        if (enableHiveSupport == True): cmd += ".enableHiveSupport()"
        cmd += ".getOrCreate()"
        self.spark = eval(cmd)
        self._init()
        return self.spark

    # Return the current Spark Context (singleton), otherwise create a new one via getOrCreateSparkSession()
    def getOrCreateSparkContext(self, master=None, appName=None, config=None, enableHiveSupport=False):
        self.getOrCreateSparkSession(master, appName, config, enableHiveSupport)
        return self.sc 

    # Create a new Spark session from the current Spark session (with isolated SQL configurations).
    # The new Spark session is sharing the underlying SparkContext and cached data,
    # but the temporary tables and registered functions are isolated.
    def createNewSparkSession(self, currentSparkSession):
        self.spark = currentSparkSession.newSession()
        self._init()
        return self.spark

    def getSparkSession(self):
        return self.spark

    def getSparkSessionConf(self):
        return self.spark.conf

    def getSparkContext(self):
        return self.sc

    def getSparkContextConf(self):
        return self.scConf

    def getSparkContextConfAll(self):
        return self.scConf.getAll()

    def setSparkContextConfAll(self, properties):
        # Properties example: { 'spark.executor.memory' : '4g', 'spark.app.name' : 'Spark Updated Conf', 'spark.executor.cores': '4',  'spark.cores.max': '4'}
        self.scConf = self.scConf.setAll(properties) # or self.scConf = self.spark.sparkContext._conf.setAll()

    # Stop (clears) the active SparkSession for current thread.
    #def stopSparkSession(self):
    #    return self.spark.clearActiveSession()

    # Stop the underlying SparkContext.
    def stopSparkContext(self):
        self.spark.stop() # Or self.sc.stop()

    # Returns the active SparkSession for the current thread, returned by the builder.
    #def getActiveSparkSession(self):
    #    return self.spark.getActiveSession()

    # Returns the default SparkSession that is returned by the builder.
    #def getDefaultSession(self):
    #    return self.spark.getDefaultSession()