pandas 如何将巨大的熊猫数据框保存到 hdfs?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/47393001/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to save a huge pandas dataframe to hdfs?
提问by Mulgard
Im working with pandas and with spark dataframes. The dataframes are always very big (> 20 GB) and the standard spark functions are not sufficient for those sizes. Currently im converting my pandas dataframe to a spark dataframe like this:
我使用Pandas和火花数据框。数据帧总是很大(> 20 GB),标准火花函数对于这些大小是不够的。目前我正在将我的Pandas数据帧转换为这样的火花数据帧:
dataframe = spark.createDataFrame(pandas_dataframe)
I do that transformation because with spark writing dataframes to hdfs is very easy:
我进行这种转换是因为使用 spark 将数据帧写入 hdfs 非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
But the transformation is failing for dataframes which are bigger than 2 GB. If I transform a spark dataframe to pandas I can use pyarrow:
但是对于大于 2 GB 的数据帧,转换失败。如果我将 spark 数据框转换为 Pandas,我可以使用 pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
This is a fast converstion from spark to pandas and it also works for dataframes bigger than 2 GB. I yet could not find a way to do it the other way around. Meaning having a pandas dataframe which I transform to spark with the help of pyarrow. The problem is that I really cant find how to write a pandas dataframe to hdfs.
这是从 spark 到 pandas 的快速转换,它也适用于大于 2 GB 的数据帧。我还没有找到相反的方法。意味着拥有一个Pandas数据框,我在 pyarrow 的帮助下将其转换为火花。问题是我真的找不到如何将 Pandas 数据帧写入 hdfs。
My pandas version: 0.19.0
我的Pandas版本:0.19.0
回答by zero323
Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.
意味着拥有一个Pandas数据框,我在 pyarrow 的帮助下将其转换为火花。
pyarrow.Table.fromPandas
is the function your looking for:
pyarrow.Table.fromPandas
是您正在寻找的功能:
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table
Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True) Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa
pdf = ... # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table
The result can be written directly to Parquet / HDFS without passing data via Spark:
结果可以直接写入 Parquet/HDFS,无需通过 Spark 传递数据:
import pyarrow.parquet as pq
fs = pa.hdfs.connect()
with fs.open(path, "wb") as fw
pq.write_table(adf, fw)
See also
也可以看看
- @WesMcKinneyanswerto read a parquet files from HDFS using PyArrow.
- Reading and Writing the Apache Parquet Formatin the
pyarrow
documentation. - Native Hadoop file system (HDFS) connectivity in Python
- @WesMcKinney回答到使用PyArrow HDFS读取一个文件,实木复合地板。
- 阅读和写作Apache的实木复合地板格式中
pyarrow
的文档。 - Python 中的本机 Hadoop 文件系统 (HDFS) 连接
Spark notes:
火花笔记:
Furthermore since Spark 2.3 (current master) Arrow is supported directly in createDataFrame
(SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame). It uses SparkContext.defaultParallelism
to compute number of chunksso you can easily control the size of individual batches.
此外,由于 Spark 2.3(当前主)Arrow 直接在createDataFrame
(SPARK-20791 - Use Apache Arrow to Improvement Spark createDataFrame from Pandas.DataFrame)中得到支持。它用于SparkContext.defaultParallelism
计算块数,因此您可以轻松控制单个批次的大小。
Finally defaultParallelism
can be used to control number of partitions generated using standard _convert_from_pandas
, effectively reducing size of the slices to something more manageable.
最后defaultParallelism
可用于控制使用标准生成的分区数量_convert_from_pandas
,有效地将切片的大小减少到更易于管理的程度。
Unfortunately these are unlikely to resolve your current memory problems. Both depend on parallelize
, therefore store all data in memory of the driver node. Switching to Arrow or adjusting configuration can only speedup the process or address block size limitations.
不幸的是,这些不太可能解决您当前的内存问题。两者都依赖于parallelize
,因此将所有数据存储在驱动程序节点的内存中。切换到 Arrow 或调整配置只能加快进程或解决块大小限制。
In practice I don't see any reason to switch to Spark here, as long as you use local Pandas DataFrame
as the input. The most severe bottleneck in this scenario is driver's network I/O and distributing data won't address that.
实际上,只要您使用本地 PandasDataFrame
作为输入,我看不出有任何理由在这里切换到 Spark 。这种情况下最严重的瓶颈是驱动程序的网络 I/O,分发数据无法解决这个问题。
回答by lego king
One other way is to convert your pandas dataframe to spark dataframe (using pyspark) and saving it to hdfs with save command. example
另一种方法是将您的 Pandas 数据帧转换为 spark 数据帧(使用 pyspark)并使用 save 命令将其保存到 hdfs。例子
df = pd.read_csv("data/as/foo.csv")
df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)
Here astype
changes type of your column from object
to string
. This saves you from otherwise raised exception as spark couldn't figure out pandas type object
. But make sure these columns really are of type string.
此处astype
将列的类型从 更改object
为string
。这使您免于否则引发的异常,因为 spark 无法弄清楚 pandas type object
。但请确保这些列确实是字符串类型。
Now to save your df in hdfs:
现在将您的 df 保存在 hdfs 中:
sdf.write.csv('mycsv.csv')
回答by mikep
From https://issues.apache.org/jira/browse/SPARK-6235
来自https://issues.apache.org/jira/browse/SPARK-6235
Support for parallelizing R data.frame larger than 2GB
支持并行化大于 2GB 的 R data.frame
is resolved.
已解决。
From https://pandas.pydata.org/pandas-docs/stable/r_interface.html
来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html
Converting DataFrames into R objects
将数据帧转换为 R 对象
you can convert a pandas dataframe to an R data.frame
您可以将Pandas数据框转换为 R 数据框
So perhaps the transformation pandas -> R -> Spark -> hdfs?
那么也许转换Pandas -> R -> Spark -> hdfs?
回答by Edge7
An hack could be to create N pandas dataframes (each less than 2 GB) (horizontal partitioning) from the big one and create N different spark dataframes, then merging (Union) them to create a final one to write into HDFS. I am assuming that your master machine is powerful but you also have available a cluster in which you are running Spark.
一个黑客可能是从大的数据帧中创建 N 个Pandas数据帧(每个小于 2 GB)(水平分区)并创建 N 个不同的火花数据帧,然后合并(联合)它们以创建最后一个写入 HDFS。我假设您的主机功能强大,但您也有一个运行 Spark 的集群。