使用 Pandas 提高大型 HDFStore 表的查询性能

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22777284/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 21:52:42  来源:igfitidea点击:

Improve Query Performance From a Large HDFStore Table with Pandas

pythonpandashdfslarge-data

提问by maxp

I have a large (~160 million rows) dataframe that I've stored to disk with something like this:

我有一个大的(约 1.6 亿行)数据框,我用这样的东西存储到磁盘中:

    def fillStore(store, tablename):
        files = glob.glob('201312*.csv')
        names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
        for f in files:
            df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
            store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

The table has a time index and I will query using c_idand f_idin addition to times (via the index).

该表有一个时间索引,我将使用c_idf_id除了时间(通过索引)进行查询。

I have another dataframe containing ~18000 "incidents." Each incident consists of some (as few as hundreds, as many as hundreds of thousands) individual records. I need to collect some simple statistics for each incident and store them in order to collect some aggregate statistics. Currently I do this like so:

我有另一个包含 ~18000 个“事件”的数据框。每个事件都包含一些(少则数百,多则数十万)个单独的记录。我需要为每个事件收集一些简单的统计数据并存储它们以收集一些汇总统计数据。目前我这样做:

def makeQueryString(c, f, start, stop):
    return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
    incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
    for ind, row in inc_times.iterrows():
        incidents = incidents.append(store.select(tablename, 
                                                  makeQueryString(row.c_id, 
                                                                  row.f_id, 
                                                                  row.start, 
                                                                  row.stop))).fillna(ind)
    return incidents

This all works fine except for the fact that each store.select()statement takes roughly 5 seconds which means that processing the full month's worth of data requires somewhere between 24-30 hours of processing. Meanwhile, the actual statistics I need are relatively simple:

这一切都很好,除了每个store.select()语句大约需要 5 秒,这意味着处理整个月的数据需要 24-30 小时的处理时间。同时,我需要的实际统计数据相对简单:

def getIncidentStats(df):
    incLen = (df.index[-1]-df.index[0]).total_seconds()
    if incLen == 0:
        incLen = .1
    rqsts = len(df)
    rqstRate_s = rqsts/incLen
    return pd.Series({'c_id':df.c_id[0],
                      'f_id':df.fqdn_id[0],
                      'Length_sec':incLen, 
                      'num_rqsts':rqsts, 
                      'rqst_rate':rqstRate_s, 
                      'avg_resp_size':df.response_len.mean(), 
                      'std_resp_size':df.response_len.std()})


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

My question is: how can I improve the performance or efficiency of any part of this work flow?(Please note that I actually batch most of the jobs to get and store incidents one day at a time simply because I want to limit the risk of losing already processed data in the even of a crash. I left this code out here for simplicity and because I actually need to process the whole month's data.)

我的问题是:如何提高此工作流程任何部分的性能或效率?(请注意,我实际上一次批处理大部分作业以获取和存储事件,仅仅是因为我想限制在崩溃时丢失已处理数据的风险。为了简单起见,我在此处省略了此代码,因为我实际上需要处理整个月的数据。)

Is there a way to process the data as I receive it from the store and is there any benefit to this? Would I benefit from using store.select_as_index? If I receive an index I'd still need to access the data to get the statistics correct?

有没有办法在我从商店收到数据时处理数据,这有什么好处吗?我会从使用 store.select_as_index 中受益吗?如果我收到一个索引,我仍然需要访问数据以获取正确的统计信息吗?

Other notes/questions: I have compared the performance of storing my HDFStore on both a SSD and normal hard drive and didn't notice any improvement for the SSD. Is this expected?

其他注意事项/问题:我比较了在 SSD 和普通硬盘上存储 HDFStore 的性能,但没有发现 SSD 有任何改进。这是预期的吗?

I also toyed with the idea of creating a large conjunction of query strings and asking for them all at once. This causes memory errors when the total query string is too large (~5-10 queries).

我还考虑过创建大量查询字符串并一次性请求所有这些字符串的想法。当总查询字符串太大(约 5-10 个查询)时,这会导致内存错误。

Edit 1If it matters, I am using tables version 3.1.0 and pandas version 0.13.1

编辑 1如果重要的话,我使用的是表版本 3.1.0 和Pandas版本 0.13.1

Edit 2Here is some more information:

编辑 2以下是更多信息:

ptdump -av store.h5
/ (RootGroup) ''
  /._v_attrs (AttributeSet), 4 attributes:
   [CLASS := 'GROUP',
    PYTABLES_FORMAT_VERSION := '2.0',
    TITLE := '',
    VERSION := '1.0']
/all_recs (Group) ''
  /all_recs._v_attrs (AttributeSet), 14 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := ['c_id', 'f_id'],
    encoding := None,
    index_cols := [(0, 'index')],
    info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
    levels := 1,
    nan_rep := 'nan',
    non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
    pandas_type := 'frame_table',
    pandas_version := '0.10.1',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
  "c_id": Int64Col(shape=(), dflt=0, pos=2),
  "f_id": Int64Col(shape=(), dflt=0, pos=3)}
  byteorder := 'little'
  chunkshape := (5461,)
  autoindex := True
  colindexes := {
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
  /all_recs/table._v_attrs (AttributeSet), 19 attributes:
   [CLASS := 'TABLE',
    FIELD_0_FILL := 0,
    FIELD_0_NAME := 'index',
    FIELD_1_FILL := 0,
    FIELD_1_NAME := 'values_block_0',
    FIELD_2_FILL := 0,
    FIELD_2_NAME := 'c_id',
    FIELD_3_FILL := 0,
    FIELD_3_NAME := 'f_id',
    NROWS := 161738653,
    TITLE := '',
    VERSION := '2.6',
    client_id_dtype := 'int64',
    client_id_kind := ['c_id'],
    fqdn_id_dtype := 'int64',
    fqdn_id_kind := ['f_id'],
    index_kind := 'datetime64',
    values_block_0_dtype := 'int64',
    values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

Here are samples of both the main table and inc_times:

以下是主表和 inc_times 的示例:

In [12]: df.head()
Out[12]: 
                          c_id        f_id          resp_id      resp_len  \
ts                                                                   
2013-12-04 08:00:00  637092486  5372764353               30      56767543   
2013-12-04 08:00:01  637092486  5399580619               23      61605423   
2013-12-04 08:00:04    5456242  5385485460               21      46742687   
2013-12-04 08:00:04    5456242  5385485460               21      49909681   
2013-12-04 08:00:04  624791800  5373236646               14      70461449   

                              s_id  
ts                           
2013-12-04 08:00:00           1829  
2013-12-04 08:00:01           1724  
2013-12-04 08:00:04           1679  
2013-12-04 08:00:04           1874  
2013-12-04 08:00:04           1727  

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]: 
        c_id     f_id                start                 stop
0       7254   196211  1385880945000000000  1385880960000000000
1       9286   196211  1387259840000000000  1387259850000000000
2      16032   196211  1387743730000000000  1387743735000000000
3      19793   196211  1386208175000000000  1386208200000000000
4      19793   196211  1386211800000000000  1386211810000000000

[5 rows x 4 columns]

Regarding c_id and f_id, the set of IDs I want to select from the full store is relatively few compared to the total number of IDs in the store. In other words, there are some popular IDs in inc_times that I will repeatedly query while completely ignoring some of the IDs that exist in the full table. I'd estimate that the Ids I care about are roughly 10% of the total IDs, but that these are the most popular IDs so their records dominate the full set.

关于c_id和f_id,我想从全店中选择的一组ID相对于店内的ID总数来说相对较少。换句话说,inc_times中有一些流行的ID,我会反复查询,而完全忽略全表中存在的一些ID。我估计我关心的 ID 大约占总 ID 的 10%,但这些是最受欢迎的 ID,因此它们的记录在整个集合中占主导地位。

I have 16GB RAM. The full store is 7.4G and the full dataset (as a csv file) is only 8.7 GB. Initially I believed I would be able to load the whole thing in memory and at least do some limited operations on it, but I get memory errors on loading the whole thing. Hence, batching it into daily files (the full file consists of data for one month).

我有 16GB 内存。完整存储为 7.4G,完整数据集(作为 csv 文件)仅为 8.7 GB。最初我相信我能够将整个内容加载到内存中,并且至少可以对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其批处理为每日文件(完整文件由一个月的数据组成)。

采纳答案by Jeff

Here's some recommendations and a similar question is here

下面是一些建议,类似的问题是在这里

Use compression: see here. You should try this (this could make it faster / slower depending on exactly what you are querying), YMMV.

使用压缩:请参见此处。你应该试试这个(这可能会使它更快/更慢,具体取决于你正在查询的内容),YMMV。

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

Use a hierarchical query in chunks. What I mean is this. Since you have a relatively small number of c_idand f_idthat you care about, structure a single query something like this. This is kind of like using isin.

分块使用分层查询。我的意思是这个。由于您所关心的数量相对较少c_idf_id因此可以像这样构建单个查询。这有点像使用isin.

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
    """ create a list of batches, maxed at maxn """
    batches = []
    while(True):
        if len(l) <= maxn:
            if len(l) > 0:
                batches.append(l)
            break
        batches.append(l[0:maxn])
        l = l[maxn:]
    return batches


results = []
for f_id_batch in create_batches(f_id_list):

    for c_id_batch in create_batches(c_id_list):

        q = "f_id={f_id} & c_id={c_id}".format(
                f_id=f_id_batch,
                c_id=c_id_batch)

        # you can include the max/min times in here as well (they would be max/min
        # time for ALL the included batches though, maybe easy for you to compute

        result = store.select('df',where=q)

        # sub process this result

        def f(x):
            # you will need to filter out the min/max timestamps here (which I gather
            # are somewhat dependent on f_id/c_id group

            #### process the data and return something
            # you could do something like: ``return x.describe()`` for simple stats

         results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

The key here is to process so that the isinDOES not have more that 32 members for any variable that you are querying on. This is an internal numpy/pytables limitation. If you exceed this, the query will work, but it will drop that variable and do a reindex on ALL the data (which is NOT what you want here).

这里的关键是进行处理,以便isin您查询的任何变量的DOES 成员都不超过 32 个。这是一个内部 numpy/pytables 限制。如果超过此值,查询将起作用,但它会删除该变量并对所有数据进行重新索引(这不是您在这里想要的)。

This way you will have a nice subset of data in memory over just a few loops. These queries I think would take about the same time as most of your queries or so, but you will have way fewer.

这样,您只需几个循环就可以在内存中获得一个很好的数据子集。我认为这些查询与您的大多数查询所需的时间大致相同,但您的时间会更少。

The query time is roughly constant for a given subset (unless the data is ordered such that it it is completely indexed).

对于给定的子集,查询时间大致恒定(除非对数据进行排序以使其完全索引)。

So the query scans 'blocks' of data (which is what the indexes point to). If you have lots of hits across many blocks then the query is slower.

因此查询会扫描数据的“块”(这是索引指向的内容)。如果您在多个块中有很多命中,那么查询速度会变慢。

Here's an example

这是一个例子

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]: 
                            A         B         C  c_id  f_id
2013-01-01 00:00:00  0.037287  1.153534  0.639669     8     7
2013-01-01 00:00:01  1.741046  0.459821  0.194282     8     3
2013-01-01 00:00:02 -2.273919 -0.141789  0.770567     1     1
2013-01-01 00:00:03  0.320879 -0.108426 -1.310302     8     6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362     5     5
2013-01-01 00:00:05  1.608211  0.069196  0.025021     3     6
2013-01-01 00:00:06 -0.561690  0.613579  1.071438     8     2
2013-01-01 00:00:07  1.795043 -0.661966  1.210714     0     0
2013-01-01 00:00:08  0.176347 -0.461176  1.624514     3     6
2013-01-01 00:00:09 -1.084537  1.941610 -1.423559     9     1
2013-01-01 00:00:10 -0.101036  0.925010 -0.809951     0     9
2013-01-01 00:00:11 -1.185520  0.968519  2.871983     7     5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014     3     6
2013-01-01 00:00:13  0.544427  0.130439  0.423749     5     7
2013-01-01 00:00:14  0.112216  0.404801 -0.061730     5     4
2013-01-01 00:00:15 -1.349838 -0.639435  0.993495     0     9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

This particular example is 5GB uncompressed and 2.9GB compressed. These results are on the compressed data. In THIS case it is actually quite a bit faster to use the uncompressed (e.g. the first loop taked 3.5s). This is 100MM rows.

这个特殊的例子是 5GB 未压缩和 2.9GB 压缩。这些结果在压缩数据上。在这种情况下,使用未压缩的实际上要快得多(例如,第一个循环需要 3.5 秒)。这是 100MM 行。

So using the last example (4) you are getting 9x the data of the first in a little over 3x the query time.

因此,使用最后一个示例 (4),您在查询时间的 3 倍多一点内获得了第一个示例的 9 倍数据。

However your speedup should be MUCH more, because you won't be selecting on individual timestamps, rather doing that later.

但是,您的加速应该更多,因为您不会选择单个时间戳,而是稍后再做。

This whole approach takes into account that you have enough main memory to hold your results in the batch sizes (e.g. you are selecting a relatively small part of the set in the batch queries).

这整个方法考虑到您有足够的主内存来保存批量大小的结果(例如,您在批量查询中选择了集合中相对较小的部分)。