从 Pandas DataFrame 创建 Spark DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/54698225/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 06:19:47  来源:igfitidea点击:

Create Spark DataFrame from Pandas DataFrame

pythonpandaspysparkapache-spark-sql

提问by Sergio Roldán

I'm trying to build a Spark DataFrame from a simple Pandas DataFrame. This are the steps I follow.

我正在尝试从一个简单的 Pandas DataFrame 构建一个 Spark DataFrame。这是我遵循的步骤。

import pandas as pd
pandas_df = pd.DataFrame({"Letters":["X", "Y", "Z"]})
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.printSchema()

Till' this point everything is OK. The output is:

直到'这一切都OK。输出是:

root
|-- Letters: string (nullable = true)

root
|-- 字母:字符串(可为空 = 真)

The problem comes when I try to print the DataFrame:

当我尝试打印 DataFrame 时出现问题:

spark_df.show()

This is the result:

这是结果:

An error occurred while calling o158.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): org.apache.spark.SparkException:
Error from python worker:
Error executing Jupyter command 'pyspark.daemon': [Errno 2] No such file or directory PYTHONPATH was:
/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/jars/spark-core_2.11-2.4.0.jar:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/: org.apache.spark.SparkException: No port number in pyspark.daemon's stdout

调用 o158.collectToPython 时出错。:org.apache.spark.SparkException:由于阶段失败而中止作业:阶段 5.0 中的任务 0 失败 1 次,最近失败:阶段 5.0 中丢失任务 0.0(TID 5、本地主机、执行程序驱动程序):org.apache.spark .SparkException:
来自 python 工作器的
错误:执行 Jupyter 命令“pyspark.daemon”时出错:[Errno 2] 没有这样的文件或目录 PYTHONPATH 是:
/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/lib/ py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/jars/spark-core_2.11-2.4.0.jar:/home/roldanx/soft/ spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/home/roldanx/soft/spark-2.4.0-bin-hadoop2.7/python/: org. apache.spark.SparkException:pyspark.daemon 的标准输出中没有端口号

These are my Spark specifications:

这些是我的 Spark 规格:

SparkSession - hive

SparkSession - 蜂巢

SparkContext

火花上下文

Spark UI

星火用户界面

Version: v2.4.0

版本:v2.4.0

Master: local[*]

大师:本地[*]

AppName: PySparkShell

应用名称:PySparkShell

This are my venv:

这是我的 venv:

export PYSPARK_PYTHON=jupyter

导出 PYSPARK_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS='lab'

导出 PYSPARK_DRIVER_PYTHON_OPTS='lab'

Fact:

事实:

As the error mentions, it has to do with running pyspark from Jupyter. Running it with 'PYSPARK_PYTHON=python2.7' and 'PYSPARK_PYTHON=python3.6' works fine

正如错误所提到的,它与从 Jupyter 运行 pyspark 有关。使用 'PYSPARK_PYTHON=python2.7' 和 'PYSPARK_PYTHON=python3.6' 运行它工作正常

回答by KRKirov

Import and initialise findspark, create a spark session and then use the object to convert the pandas data frame to a spark data frame. Then add the new spark data frame to the catalogue. Tested and runs in both Jupiter 5.7.2 and Spyder 3.3.2 with python 3.6.6.

导入并初始化 findspark,创建一个 spark 会话,然后使用该对象将 pandas 数据帧转换为 spark 数据帧。然后将新的 spark 数据框添加到目录中。使用 python 3.6.6 在 Jupiter 5.7.2 和 Spyder 3.3.2 中测试并运行。

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

# Create a spark session
spark = SparkSession.builder.getOrCreate()

# Create pandas data frame and convert it to a spark data frame 
pandas_df = pd.DataFrame({"Letters":["X", "Y", "Z"]})
spark_df = spark.createDataFrame(pandas_df)

# Add the spark data frame to the catalog
spark_df.createOrReplaceTempView('spark_df')

spark_df.show()
+-------+
|Letters|
+-------+
|      X|
|      Y|
|      Z|
+-------+

spark.catalog.listTables()
Out[18]: [Table(name='spark_df', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]