将 Pandas 转换为 Spark 时出现类型错误

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39862211/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 02:08:12  来源:igfitidea点击:

TypeError when converting Pandas to Spark

pythonpandasapache-sparkpyspark

提问by gold_cy

So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format

所以我在这里查找了这个问题,但以前的解决方案对我不起作用。我有一个这种格式的 DataFrame

mdf.head()
    dbn       boro       bus
0   17K548  Brooklyn    B41, B43, B44-SBS, B45, B48, B49, B69
1   09X543  Bronx       Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4   28Q680  Queens      Q25, Q46, Q65
6   14K474  Brooklyn    B24, B43, B48, B60, Q54, Q59

There are a couple more columns but I have excluded them (subway lines and test scores). When I try to convert this DataFrame into a Spark DataFrame I am given an error which is this.

还有几列,但我已经排除了它们(地铁线路和考试成绩)。当我尝试将此 DataFrame 转换为 Spark DataFrame 时,我得到了一个错误,就是这个。

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
--> 425             rdd, schema = self._createFromLocal(data, schema)
    426         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    427         jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
    339 
    340         if schema is None or isinstance(schema, (list, tuple)):
--> 341             struct = self._inferSchemaFromList(data)
    342             if isinstance(schema, (list, tuple)):
    343                 for i, name in enumerate(schema):

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
    239             warnings.warn("inferring schema from dict is deprecated,"
    240                           "please use pyspark.sql.Row instead")
--> 241         schema = reduce(_merge_type, map(_infer_schema, data))
    242         if _has_nulltype(schema):
    243             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    860         nfs = dict((f.name, f.dataType) for f in b.fields)
    861         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862                   for f in a.fields]
    863         names = set([f.name for f in fields])
    864         for n in nfs:

/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
    854     elif type(a) is not type(b):
    855         # TODO: type cast (such as int -> long)
--> 856         raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
    857 
    858     # same type

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

From what I have read this might be a problem with the headers being treated as data. It is my understanding you can't remove the headers from a DataFrame so how would I proceed with solving this error and converting this DataFrame into a Spark one?

从我所读到的,这可能是标题被视为数据的问题。据我了解,您无法从 DataFrame 中删除标头,那么我将如何继续解决此错误并将此 DataFrame 转换为 Spark 错误?

Edit: Here is the code for how I created the Pandas DF and worked my way around the problem.

编辑:这是我如何创建 Pandas DF 并解决问题的代码。

sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")

The last line of this code, loading it from my local machine ended up allowing me to convert the CSV properly to a Data Frame however my question still remains. Why did it not work in the first place?

这段代码的最后一行,从我的本地机器加载它最终允许我将 CSV 正确转换为数据帧,但我的问题仍然存在。为什么它首先不起作用?

采纳答案by user4601931

You could use reflection to infer the schema from an RDD of Rowobjects, e.g.,

您可以使用反射从Row对象的 RDD 推断模式,例如,

from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)

Does that achieve the desired result?

这是否达到了预期的结果?

回答by kmader

I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferSchemecommand runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_typecommand. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFramecommand.

我遇到了同样的问题,并且能够将其跟踪到长度为 0(或空)的单个条目。该_inferScheme命令在数据帧的每一行上运行并确定类型。默认情况下,空值是一个 Double 而另一个是 String。这两种类型不能被_merge_type命令合并。该问题已提交https://issues.apache.org/jira/browse/SPARK-18178,但最好的方法可能是为createDataFrame命令提供模式。

The code below reproduces the problem in PySpark 2.0

下面的代码重现了PySpark 2.0中的问题

import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()

回答by Ankit Kumar Namdeo

You can try this as well:

你也可以试试这个:

def create_spark_dataframe(file_name):
   """
   will return the spark dataframe input pandas dataframe
   """
   pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
   for col in pandas_data_frame.columns:
   if ((pandas_data_frame[col].dtypes != np.int64) & 
      (pandas_data_frame[col].dtypes != np.float64)):
    pandas_data_frame[col] = pandas_data_frame[col].fillna('')

   spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
   return spark_data_frame

This will solve your problem.

这将解决您的问题。

回答by Itachi

The problem here is pandas default np.nan(Not a number) value for empty string, which creates a confusion in Schema while converting to spark.df.

这里的问题是np.nan空字符串的Pandas默认值(不是数字),这会在转换为 spark.df 时在 Schema 中造成混淆。

Basic approach is convert np.nan to None, which will enable it to work

基本方法是将 np.nan 转换为 None,这将使其能够工作

Unfortunately, pandas does not let you fillna with None. As, np.nan doesn't follow self equality condition, you can do this nifty trick.

不幸的是,pandas 不允许你用 None 填充。由于 np.nan 不遵循自相等条件,因此您可以使用这个绝妙的技巧。

new_series = new_series.apply(lambda x: None if x != x else x)

Then, display(sqlContext.createDataFrame(new_df_1))would work fine

然后,display(sqlContext.createDataFrame(new_df_1))会工作得很好