Pytables/Pandas:组合(阅读?)按行拆分的多个 HDF5 存储

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22750228/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 21:52:07  来源:igfitidea点击:

Pytables/Pandas : Combining (reading?) mutliple HDF5 stores split by rows

pythonparallel-processingpandashdf5pytables

提问by Hussain Sultan

In "write once, read many" workflow, i frequently parse large text files (20GB-60GB) dumped from Teradata using FastExport utility and load them into Pytables using Pandas. I am using multiprocessing to chunk the text files and distributing them to different processes to write a .H5 files split based on row count around 5MM each to support parallel writing. This is quite fast around 12 minutes for writing multiple hdf5 files in parallel as compared two 22 minutes for writing a single hdf5 file for 25MM rows x 64 columns.

在“一次写入,多次读取”工作流程中,我经常使用 FastExport 实用程序解析从 Teradata 转储的大型文本文件 (20GB-60GB),并使用 Pandas 将它们加载到 Pytables 中。我正在使用 multiprocessing 来分块文本文件并将它们分发到不同的进程,以编写基于行数拆分的 .H5 文件,每个文件都支持并行写入。与为 25MM 行 x 64 列编写单个 hdf5 文件需要两个 22 分钟相比,这对于并行写入多个 hdf5 文件大约需要 12 分钟。

%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop

For the case of writing multiple h5 files split by rows, i end up having multiple files with same structure that i wish to combine in a single h5file root/data/table

对于编写按行拆分的多个 h5 文件的情况,我最终拥有多个具有相同结构的文件,我希望将它们合并到一个 h5file根/数据/表中

To test combining functionality, here is the code snippet:

要测试组合功能,这里是代码片段:

import tables as tb
import pandas as pd

tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')

filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    store.append(key='data',value=df,format='t',chunksize=200000)

store.close()

Here is %timeit result for this:

这是 %timeit 结果:

1 loops, best of 3: 8min 22s per loop

This basically eats up most of the time that i gained by writing multiple h5 files in parallel. I have a two part question:

这基本上消耗了我通过并行编写多个 h5 文件获得的大部分时间。我有一个两部分的问题:

  1. Is there a way to combine (append) h5 files with the same table format more efficiently?(SQL Union like functionality).I tried thisSO but couldn't get it to append tables.

  2. if not, is splitting on rows a reasonable thing to do when most of the queries are select from where for all the columns? i am thinking about writing a map/combine function that will look in all the parts of a table for select from where queries. Pandas select_as_multiple()function does this for splitting based on columns.

  1. 有没有办法更有效地将 h5 文件与相同的表格式组合(附加)?(类似 SQL 联合的功能)。我试过这个,但无法让它附加表。

  2. 如果不是,当大多数查询是从所有列的位置选择时,按行拆分是否合理?我正在考虑编写一个 map/combine 函数,该函数将查看表的所有部分以从 where 查询中进行选择。Pandas select_as_multiple()函数执行此操作以基于列进行拆分。



Update Based on Jeff's Suggestions:

根据杰夫的建议更新:

Great call-out on removing the indexing and compression on pre-merge file writing process. After removing the indexing,compression and setting the max row count per pre-merge file to 1MM rows:

在预合并文件写入过程中删除索引和压缩的很好的调用。删除索引、压缩并将每个预合并文件的最大行数设置为 1MM 行后:

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop

This is a little over 2 minutes faster than before and pretty much as fast as i can parse the data. After setting the data columns to the desired fields (3 in my case):

这比以前快了 2 分钟多一点,几乎和我解析数据的速度一样快。将数据列设置为所需字段后(在我的情况下为 3):

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    dc=df.columns[1:4]
    store.append(key='data',value=df,format='t',data_columns=dc)

This is about 2 minutes slower than before: 1 loops, best of 3: 10min 23s per loop. After removing compression from the above code, i get 1 loops, best of 3: 8min 48s per loop(almost identical to the first try with compression and no data column index). To give you an idea of how well the compression works, the uncompressed store is around 13.5GB while the compressed version using bloscis around 3.7GB.

这比以前慢了大约 2 分钟:1 loops, best of 3: 10min 23s per loop. 从上述代码中删除压缩后,我得到1 loops, best of 3: 8min 48s per loop(几乎与第一次尝试压缩且没有数据列索引相同)。为了让您了解压缩的效果如何,未压缩的存储大约为 13.5GB,而使用的压缩版本blosc大约为 3.7GB。

In summary, my process takes 18 minutes 15 secondsto create a merged uncompressed hdf5 file. This as compared to the single file writing (compressed) is about 4 minutes 7 secondsfaster.

总之,我的过程需要18 minutes 15 seconds创建一个合并的未压缩 hdf5 文件。与单个文件写入(压缩)相比,这4 minutes 7 seconds更快。

This brings me to the second part of my questions, what if i don't merge the files and use the pre-merge files to be processed in map/combine way, could that be a reasonable way to approach this? how should i think about implementing this?

这让我想到了我的问题的第二部分,如果我不合并文件并使用要以映射/组合方式处理的预合并文件,这是否是解决此问题的合理方法?我应该如何考虑实施这个?

For full disclosure, i am on Pandas version 0.12.0, Pytables version 3.0.0and my data processing workflow is as follows (pseudo-code):

为了充分披露,我使用的是 Pandas 版本0.12.0,Pytables 版本3.0.0,我的数据处理工作流程如下(伪代码):

def generate_chunks_from_text_file(reader,chunksize=50000):
    """ generator that yields processed text chunks """

    for i, line in enumerate(reader.readlines()):
        ----process data and yield chunk -----


def data_reader(reader,queue):
    """ read data from file and put it into a queue for multiprocessing """

    for chunk in self.generate_chunks_from_text_file(reader):
        queue.put(chunk) # put data in the queue for the writer

def data_processor(queue,filename,dtype,min_size):
    """" subprocess that reads the next value in the queue and writes hdf store. """

    store=pd.HDFStore(filename)

    while True:

        results = queue.get()
        array=np.array(results,dtype=dt) # convert to numpy array
        df = pd.DataFrame(array) #covert to pandas array

        store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
    store.close()
        ----when queue exhausts - break-----

采纳答案by Jeff

I do a very similar, split-process-combine method, using multiple processes to create intermediate files, then use a single process to merge the resulting files. Here are some tips to get better performance:

我做了一个非常相似的 split-process-combine 方法,使用多个进程创建中间文件,然后使用单个进程合并生成的文件。以下是获得更好性能的一些提示:

  • Turn off indexing while you are writing the files by passing index=False, see herefor the docs. I believe that PyTablesincrementally updates the index, which in this case is completely unecessary (as you are going to merge them afterwards). Index only the final file. This should speed up the writing quite a bit.

  • You might consider changing the default indexing scheme / level, depending on what your queries are (assume you follow the advice several points below to NOT create too many data columns).

  • In a similar vein, don't create a compressed file when writing the pre-merged files, rather create it AFTER the indexed file is written (in an uncompressed state), so this ends up being your final step. See the docs here. Furthermore, it is very important to pass --chunkshape=autowhen using ptrepackwhich recomputes the PyTables chunksize (e.g. how much data is read/written in a single block), as it will take into account the entire table.

  • RE compression, YMMV may vary here, depending on how well your data actually compresses, and what kinds of queries you are doing. I have some types of data that I find it is faster to NOT compress at all even though in theory it should be better. You have to just experiment (though I always do use blosc). Blosc only has onecompression level (its either on for levels 1-9 or off for level 0). So changing this will not change anything.

  • I merge the files in the indexed order, basically by reading a subset of the pre-merge files into memory (a constant number to use only a constant amount of memory), then append them one-by-one to the final file. (not 100% sure this makes a difference but seems to work well).

  • You will find that the vast majority of your time is spent creatingthe index.

  • Furthermore, only index the columns that you actually need! by making sure to specify data_columns=a_small_subset_of_columnswhen writing each file.

  • I find that writing a lot of smallish files is better, then merging to create a largish file, rather than writing a few large files, but YMMV here. (e.g. say 100 100MB pre-merge files to yield a 10GB file, rather than 5 2GB files). Though this may be a function of my processing pipeline as I tend to bottleneck on the processing rather than the actual writing.

  • I have not used, but hear amazing things about using a SSD (sold-state-drive), even if it's relatively small for this kind of thing. You can get an order of magnitude of speedup using one (and compression may change this result).

  • 在通过传递写入文件时关闭索引index=False,请参阅此处的文档。我相信PyTables增量更新索引,在这种情况下是完全没有必要的(因为您将在之后合并它们)。仅索引最终文件。这应该会加快写作速度。

  • 您可能会考虑更改默认索引方案/级别,具体取决于您的查询内容(假设您遵循以下几点建议,不要创建太多数据列)。

  • 同样,不要在写入预合并文件时创建压缩文件,而是在写入索引文件后创建它(处于未压缩状态),因此这最终是您的最后一步。请参阅此处的文档。此外,--chunkshape=auto在使用ptrepackwhich 重新计算 PyTables 块大小(例如在单个块中读取/写入多少数据)时传递非常重要,因为它会考虑整个表。

  • RE 压缩,YMMV 在这里可能会有所不同,具体取决于您的数据实际压缩的程度以及您正在执行的查询类型。我有一些类型的数据,我发现根本不压缩会更快,即使理论上它应该更好。您必须进行实验(尽管我总是使用blosc)。Blosc 只有一个压缩级别(它要么在级别 1-9 中打开,要么在级别 0 中关闭)。所以改变这个不会改变任何东西。

  • 我按索引顺序合并文件,基本上是通过将预合并文件的一个子集读入内存(一个常数以仅使用恒定数量的内存),然后将它们一个一个地附加到最终文件中。(不是 100% 确定这会有所作为,但似乎效果很好)。

  • 你会发现你的绝大部分时间都花在creating了索引上。

  • 此外,只索引您实际需要的列!通过确保data_columns=a_small_subset_of_columns在写入每个文件时指定。

  • 我发现写很多小文件比较好,然后合并创建一个大文件,而不是写几个大文件,而是YMMV。(例如说 100 个 100MB 的预合并文件产生一个 10GB 的文件,而不是 5 个 2GB 的文件)。虽然这可能是我的处理管道的一个功能,因为我倾向于在处理而不是实际写作上遇到瓶颈。

  • 我没有使用过,但听说过使用 SSD(销售状态驱动器)的惊人之处,即使它对于这种事情来说相对较小。您可以获得一个数量级的加速(压缩可能会改变这个结果)。