pandas dask 可以并行读取 csv 文件吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40100176/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 02:14:07  来源:igfitidea点击:

Can dask parralelize reading fom a csv file?

pythoncsvpandasdask

提问by Magellan88

I'm converting a large textfile to a hdf storage in hopes of a faster data access. The conversion works allright, however reading from the csv file is not done in parallel. It is really slow (takes about 30min for a 1GB textfile on an SSD, so my guess is that it is not IO-bound).

我正在将大型文本文件转换为 hdf 存储,希望能更快地访问数据。转换工作正常,但是从 csv 文件读取不是并行完成的。它真的很慢(SSD 上的 1GB 文本文件大约需要 30 分钟,所以我猜它不是 IO 绑定的)。

Is there a way to have it read in multiple threads in parralel? Sice it might be important, I'm currently forced to run under Windows -- just in case that makes any difference.

有没有办法让它在并行的多个线程中读取?这可能很重要,我目前被迫在 Windows 下运行——以防万一。

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df.categorize([ 'Type',
                'Condition',               
          ])

df.to_hdf("data/data.hdf", "Measurements", 'w')

采纳答案by MRocklin

Yes, dask.dataframe can read in parallel. However you're running into two problems:

是的,dask.dataframe 可以并行读取。但是,您遇到了两个问题:

Pandas.read_csv only partially releases the GIL

Pandas.read_csv 仅部分释放 GIL

By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text

默认情况下 dask.dataframe 与线程并行,因为大多数 Pandas 可以在多个线程中并行运行(释放 GIL)。Pandas.read_csv 是一个例外,特别是如果您的结果数据帧使用对象 dtypes 作为文本

dask.dataframe.to_hdf(filename) forces sequential computation

dask.dataframe.to_hdf(filename) 强制顺序计算

Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)

写入单个 HDF 文件将强制执行顺序计算(并行写入单个文件非常困难。)

Edit: New solution

编辑:新解决方案

Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.

今天我会避免使用 HDF 而使用 Parquet。我可能会使用 multiprocessing 或 dask.distributed 调度程序来避免单台机器上的 GIL 问题。这两者的组合应该给你完整的线性缩放。

from dask.distributed import Client
client = Client()

df = dask.dataframe.read_csv(...)
df.to_parquet(...)

Solution

解决方案

Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.

因为您的数据集可能适合内存,所以使用 dask.dataframe.read_csv 与多个进程并行加载,然后立即切换到 Pandas。

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv("data/Measurements*.csv",  # read in parallel
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(get=dask.multiprocessing.get)     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')

回答by mgoldwasser

Piggybacking off of @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes')or df.compute(scheduler='threads')to convert to pandas using multiprocessing or multithreading:

借助@MRocklin 的回答,在较新版本的 dask 中,您可以使用df.compute(scheduler='processes')df.compute(scheduler='threads')使用多处理或多线程转换为Pandas:

from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
             sep=';', 
             parse_dates=["DATETIME"], 
             blocksize=1000000,
             )

df = df.compute(scheduler='processes')     # convert to pandas

df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')

df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')