Python 将 Pandas 数据帧转换为 Spark 数据帧错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37513355/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Converting Pandas dataframe into Spark dataframe error
提问by Иван Судос
I'm trying to convert Pandas DF into Spark one. DF head:
我正在尝试将 Pandas DF 转换为 Spark 一个。DF头:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Code:
代码:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
And I got an error:
我得到了一个错误:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
采纳答案by madman2890
You need to make sure your pandas dataframe columns are appropriate for the type spark is inferring. If your pandas dataframe lists something like:
您需要确保您的 Pandas 数据框列适合 spark 推断的类型。如果您的熊猫数据框列出了以下内容:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
And you're getting that error try:
你得到那个错误尝试:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Now, make sure .astype(str)
is actually the type you want those columns to be. Basically, when the underlying Java code tries to infer the type from an object in python it uses some observations and makes a guess, if that guess doesn't apply to all the data in the column(s) it's trying to convert from pandas to spark it will fail.
现在,确保.astype(str)
这些列实际上是您想要的类型。基本上,当底层 Java 代码尝试从 Python 中的对象推断类型时,它会使用一些观察并进行猜测,如果该猜测不适用于列中的所有数据,则它会尝试从 Pandas 转换为火花它会失败。
回答by Grant Shannon
Type related errors can be avoided by imposing a schemaas follows:
可以通过按如下方式强加模式来避免类型相关的错误:
note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").
注意:使用原始数据(如上)创建了一个文本文件(test.csv),并插入了假设的列名(“col1”、“col2”、...、“col25”)。
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
contents of the pandas data frame:
熊猫数据框的内容:
col1 col2 col3 col4 col5 col6 col7 col8 ...
0 10000001 1 0 1 12:35 OK 10002 1 ...
1 10000001 2 0 1 12:36 OK 10002 1 ...
2 10000002 1 0 4 12:19 PA 10003 1 ...
Next, create the schema:
接下来,创建架构:
from pyspark.sql.types import *
mySchema = StructType([ StructField("col1", LongType(), True)\
,StructField("col2", IntegerType(), True)\
,StructField("col3", IntegerType(), True)\
,StructField("col4", IntegerType(), True)\
,StructField("col5", StringType(), True)\
,StructField("col6", StringType(), True)\
,StructField("col7", IntegerType(), True)\
,StructField("col8", IntegerType(), True)\
,StructField("col9", IntegerType(), True)\
,StructField("col10", IntegerType(), True)\
,StructField("col11", StringType(), True)\
,StructField("col12", StringType(), True)\
,StructField("col13", IntegerType(), True)\
,StructField("col14", IntegerType(), True)\
,StructField("col15", IntegerType(), True)\
,StructField("col16", IntegerType(), True)\
,StructField("col17", IntegerType(), True)\
,StructField("col18", IntegerType(), True)\
,StructField("col19", IntegerType(), True)\
,StructField("col20", IntegerType(), True)\
,StructField("col21", IntegerType(), True)\
,StructField("col22", IntegerType(), True)\
,StructField("col23", IntegerType(), True)\
,StructField("col24", IntegerType(), True)\
,StructField("col25", IntegerType(), True)])
Note: True
(implies nullable allowed)
注意:(True
意味着允许为空)
create the pyspark dataframe:
创建 pyspark 数据框:
df = spark.createDataFrame(pdDF,schema=mySchema)
confirm the pandas data frame is now a pyspark data frame:
确认 pandas 数据框现在是 pyspark 数据框:
type(df)
output:
输出:
pyspark.sql.dataframe.DataFrame
Aside:
旁白:
To address Kate's comment below - to impose a general (String) schema you can do the following:
要解决凯特在下面的评论 - 要强加通用(字符串)模式,您可以执行以下操作:
df=spark.createDataFrame(pdDF.astype(str))
回答by Gonzalo Garcia
I made this script, It worked for my 10 pandas Data frames
我制作了这个脚本,它适用于我的 10 个熊猫数据框
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return DateType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
You can see it also in this gist
你也可以在这个要点中看到它
With this you just have to call spark_df = pandas_to_spark(pandas_df)
有了这个,你只需要打电话 spark_df = pandas_to_spark(pandas_df)
回答by RoyaumeIX
I have tried this with your data and it is working :
我已经用你的数据尝试过这个,它正在工作:
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
回答by heathensoul
I received a similar error message once, in my case it was because my pandas dataframe contained NULLs. I will recommend to try & handle this in pandas before converting to spark (this resolved the issue in my case).
我曾经收到过类似的错误消息,就我而言,这是因为我的 Pandas 数据帧包含 NULL。我会建议在转换为 spark 之前尝试在 Pandas 中处理这个问题(这解决了我的问题)。