将 Pandas DataFrame 转换为 Spark DataFrame
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40411871/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Converting Pandas DataFrame to Spark DataFrame
提问by Dirigo
I had asked a previous question about how to Convert scipy sparse matrix to pyspark.sql.dataframe.DataFrame, and made some progress after reading the answer provided, as well as this article. I eventually came to the following code for converting a scipy.sparse.csc_matrix to a pandas dataframe:
我之前问过一个关于如何将 scipy 稀疏矩阵转换为 pyspark.sql.dataframe.DataFrame 的问题,并在阅读了提供的答案以及这篇文章后取得了一些进展。我最终得到了以下代码,用于将 scipy.sparse.csc_matrix 转换为 Pandas 数据帧:
df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0)
df.columns = header
I then tried converting the pandas dataframe to a spark dataframe using the suggested syntax:
然后我尝试使用建议的语法将 Pandas 数据帧转换为 spark 数据帧:
spark_df = sqlContext.createDataFrame(df)
However, I get back the following error:
但是,我返回以下错误:
ValueError: cannot create an RDD from type: <type 'list'>
I do not believe it has anything to do with the sqlContext as I was able to convert another pandas dataframe of roughly the same size to a spark dataframe, no problem. Any thoughts?
我不相信它与 sqlContext 有任何关系,因为我能够将另一个大小大致相同的 Pandas 数据帧转换为 spark 数据帧,没问题。有什么想法吗?
回答by Dirigo
I am not sure if this question is still relevant to the current version of pySpark, but here is the solution I worked out a couple weeks after posting this question. The code is rather ugly and possibly inefficient, but I am posting it here due to the continued interest in this question.:
我不确定这个问题是否仍然与当前版本的 pySpark 相关,但这是我在发布这个问题几周后制定的解决方案。该代码相当丑陋且可能效率低下,但由于对这个问题的持续兴趣,我将其发布在这里。:
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark import SparkConf
from py4j.protocol import Py4JJavaError
myConf = SparkConf(loadDefaults=True)
sc = SparkContext(conf=myConf)
hc = HiveContext(sc)
def chunks(lst, k):
"""Yield k chunks of close to equal size"""
n = len(lst) / k
for i in range(0, len(lst), n):
yield lst[i: i + n]
def reconstruct_rdd(lst, num_parts):
partitions = chunks(lst, num_parts)
for part in range(0, num_parts - 1):
print "Partition ", part, " started..."
partition = next(partitions) # partition is a list of lists
if part == 0:
prime_rdd = sc.parallelize(partition)
else:
second_rdd = sc.parallelize(partition)
prime_rdd = prime_rdd.union(second_rdd)
print "Partition ", part, " complete!"
return prime_rdd
def build_col_name_list(len_cols):
name_lst = []
for i in range(1, len_cols):
idx = "_" + str(i)
name_lst.append(idx)
return name_lst
def set_spark_df_header(header, sdf):
oldColumns = build_col_name_lst(len(sdf.columns))
newColumns = header
sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf)
return sdf
def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts):
try:
sdf = hc.createDataFrame(pdf)
except ValueError:
lst = pdf.values.tolist() #Need to convert to list of list to parallelize
try:
rdd = sc.parallelize(lst)
except Py4JJavaError:
rdd = reconstruct_rdd(lst, num_of_parts)
sdf = hc.createDataFrame(rdd)
sdf = set_spark_df_header(sdf_header, sdf)
return sdf
回答by user10136015
to_sparse(fill_value=0)
is basically obsolete. Just use standard variant
to_sparse(fill_value=0)
基本上已经过时了。只需使用标准变体
sqlContext.createDataFrame(pd.DataFrame(csc_mat.todense()))
and as long as types are compatible you'd be fine.
只要类型兼容,你就可以了。