将 Pandas 转换为 Spark 时出现类型错误
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39862211/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
TypeError when converting Pandas to Spark
提问by gold_cy
So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format
所以我在这里查找了这个问题,但以前的解决方案对我不起作用。我有一个这种格式的 DataFrame
mdf.head()
dbn boro bus
0 17K548 Brooklyn B41, B43, B44-SBS, B45, B48, B49, B69
1 09X543 Bronx Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
4 28Q680 Queens Q25, Q46, Q65
6 14K474 Brooklyn B24, B43, B48, B60, Q54, Q59
There are a couple more columns but I have excluded them (subway lines and test scores). When I try to convert this DataFrame into a Spark DataFrame I am given an error which is this.
还有几列,但我已经排除了它们(地铁线路和考试成绩)。当我尝试将此 DataFrame 转换为 Spark DataFrame 时,我得到了一个错误,就是这个。
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-30-1721be5c2987> in <module>()
----> 1 sparkdf = sqlc.createDataFrame(mdf)
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
--> 425 rdd, schema = self._createFromLocal(data, schema)
426 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
427 jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
339
340 if schema is None or isinstance(schema, (list, tuple)):
--> 341 struct = self._inferSchemaFromList(data)
342 if isinstance(schema, (list, tuple)):
343 for i, name in enumerate(schema):
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
239 warnings.warn("inferring schema from dict is deprecated,"
240 "please use pyspark.sql.Row instead")
--> 241 schema = reduce(_merge_type, map(_infer_schema, data))
242 if _has_nulltype(schema):
243 raise ValueError("Some of types cannot be determined after inferring")
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
860 nfs = dict((f.name, f.dataType) for f in b.fields)
861 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
--> 862 for f in a.fields]
863 names = set([f.name for f in fields])
864 for n in nfs:
/usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
854 elif type(a) is not type(b):
855 # TODO: type cast (such as int -> long)
--> 856 raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
857
858 # same type
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
From what I have read this might be a problem with the headers being treated as data. It is my understanding you can't remove the headers from a DataFrame so how would I proceed with solving this error and converting this DataFrame into a Spark one?
从我所读到的,这可能是标题被视为数据的问题。据我了解,您无法从 DataFrame 中删除标头,那么我将如何继续解决此错误并将此 DataFrame 转换为 Spark 错误?
Edit: Here is the code for how I created the Pandas DF and worked my way around the problem.
编辑:这是我如何创建 Pandas DF 并解决问题的代码。
sqlc = SQLContext(sc)
df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
mdf = mdf[pd.notnull(mdf['DBN'])]
mdf.to_csv('merged.csv', encoding = 'utf-8')
ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")
The last line of this code, loading it from my local machine ended up allowing me to convert the CSV properly to a Data Frame however my question still remains. Why did it not work in the first place?
这段代码的最后一行,从我的本地机器加载它最终允许我将 CSV 正确转换为数据帧,但我的问题仍然存在。为什么它首先不起作用?
采纳答案by user4601931
You could use reflection to infer the schema from an RDD of Row
objects, e.g.,
您可以使用反射从Row
对象的 RDD 推断模式,例如,
from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)
Does that achieve the desired result?
这是否达到了预期的结果?
回答by kmader
I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferScheme
command runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_type
command. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFrame
command.
我遇到了同样的问题,并且能够将其跟踪到长度为 0(或空)的单个条目。该_inferScheme
命令在数据帧的每一行上运行并确定类型。默认情况下,空值是一个 Double 而另一个是 String。这两种类型不能被_merge_type
命令合并。该问题已提交https://issues.apache.org/jira/browse/SPARK-18178,但最好的方法可能是为createDataFrame
命令提供模式。
The code below reproduces the problem in PySpark 2.0
下面的代码重现了PySpark 2.0中的问题
import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()
回答by Ankit Kumar Namdeo
You can try this as well:
你也可以试试这个:
def create_spark_dataframe(file_name):
"""
will return the spark dataframe input pandas dataframe
"""
pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
for col in pandas_data_frame.columns:
if ((pandas_data_frame[col].dtypes != np.int64) &
(pandas_data_frame[col].dtypes != np.float64)):
pandas_data_frame[col] = pandas_data_frame[col].fillna('')
spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
return spark_data_frame
This will solve your problem.
这将解决您的问题。
回答by Itachi
The problem here is pandas default np.nan
(Not a number) value for empty string, which creates a confusion in Schema while converting to spark.df.
这里的问题是np.nan
空字符串的Pandas默认值(不是数字),这会在转换为 spark.df 时在 Schema 中造成混淆。
Basic approach is convert np.nan to None, which will enable it to work
基本方法是将 np.nan 转换为 None,这将使其能够工作
Unfortunately, pandas does not let you fillna with None. As, np.nan doesn't follow self equality condition, you can do this nifty trick.
不幸的是,pandas 不允许你用 None 填充。由于 np.nan 不遵循自相等条件,因此您可以使用这个绝妙的技巧。
new_series = new_series.apply(lambda x: None if x != x else x)
Then, display(sqlContext.createDataFrame(new_df_1))
would work fine
然后,display(sqlContext.createDataFrame(new_df_1))
会工作得很好