pandas 如何在使用 Pool.map() 进行多处理时解决内存问题?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/49429368/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 05:21:46  来源:igfitidea点击:

How to solve memory issues problems while multiprocessing using Pool.map()?

pythonpandasmemorymultiprocessingpython-multiprocessing

提问by everestial007

I have written the program (below) to:

我已将程序(如下)编写为:

  • read a huge text file as pandas dataframe
  • then groupbyusing a specific column value to split the data and store as list of dataframes.
  • then pipe the data to multiprocess Pool.map()to process each dataframe in parallel.
  • 读取一个巨大的文本文件作为 pandas dataframe
  • 然后groupby使用特定的列值拆分数据并存储为数据帧列表。
  • 然后将数据通过管道传输到multiprocess Pool.map()并行处理每个数据帧。

Everything is fine, the program works well on my small test dataset. But, when I pipe in my large data (about 14 GB), the memory consumption exponentially increases and then freezes the computer or gets killed (in HPC cluster).

一切都很好,该程序在我的小型测试数据集上运行良好。但是,当我输入大数据(大约 14 GB)时,内存消耗呈指数增长,然后使计算机冻结或被杀死(在 HPC 集群中)。

I have added codes to clear the memory as soon as the data/variable isn't useful. I am also closing the pool as soon as it is done. Still with 14 GB input I was only expecting 2*14 GB memory burden, but it seems like lot is going on. I also tried to tweak using chunkSize and maxTaskPerChild, etcbut I am not seeing any difference in optimization in both test vs. large file.

一旦数据/变量没有用,我就添加了代码来清除内存。我也将在完成后立即关闭游泳池。仍然使用 14 GB 输入,我只期望 2*14 GB 内存负担,但似乎有很多事情发生。我也尝试使用调整,chunkSize and maxTaskPerChild, etc但我没有看到测试与大文件的优化有任何差异。

I think improvements to this code is/are required at this code position, when I start multiprocessing.

我认为在此代码位置需要对此代码进行改进,当我开始multiprocessing.

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))but, I am posting the whole code.

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))但是,我发布了整个代码。

Test example:I created a test file ("genome_matrix_final-chr1234-1mb.txt") of upto 250 mb and ran the program. When I check the system monitor I can see that memory consumption increased by about 6 GB. I am not so clear why so much memory space is taken by 250 mb file plus some outputs. I have shared that file via drop box if it helps in seeing the real problem. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

测试示例:我创建了一个最大 250 mb 的测试文件(“genome_matrix_final-chr1234-1mb.txt”)并运行该程序。当我检查系统监视器时,我可以看到内存消耗增加了大约 6 GB。我不太清楚为什么 250 mb 文件加上一些输出占用了这么多内存空间。如果它有助于查看真正的问题,我已通过保管箱共享该文件。https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

Can someone suggest, How I can get rid of the problem?

有人可以建议,我怎样才能摆脱这个问题?

My python script:

我的python脚本:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

Update for bounty hunters:

赏金猎人更新:

I have achieved multiprocessing using Pool.map()but the code is causing a big memory burden (input test file ~ 300 mb, but memory burden is about 6 GB). I was only expecting 3*300 mb memory burden at max.

我已经实现了多处理,Pool.map()但代码造成了很大的内存负担(输入测试文件 ~ 300 mb,但内存负担约为 6 GB)。我只期望最大 3*300 mb 的内存负担。

  • Can somebody explain, What is causing such a huge memory requirement for such a small file and for such small length computation.
  • Also, i am trying to take the answer and use that to improve multiprocess in my large program. So, addition of any method, module that doesn't change the structure of computation part (CPU bound process) too much should be fine.
  • I have included two test files for the test purposes to play with the code.
  • The attached code is full code so it should work as intended as it is when copied-pasted. Any changes should be used only to improve optimization in multiprocessing steps.
  • 有人可以解释一下,是什么导致如此小的文件和如此小的长度计算需要如此巨大的内存。
  • 另外,我正在尝试回答并使用它来改进我的大型程序中的多进程。因此,添加任何方法,模块不会过多改变计算部分(CPU绑定进程)的结构应该没问题。
  • 我已经包含了两个测试文件用于测试目的来使用代码。
  • 附加的代码是完整的代码,因此它应该像复制粘贴时一样按预期工作。任何更改都应仅用于改进多处理步骤中的优化。

回答by saaj

Prerequisite

先决条件

  1. In Python (in the following I use 64-bit build of Python 3.6.5) everything is an object. This has its overhead and with getsizeofwe can see exactly the size of an object in bytes:

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. When fork system call used (default on *nix, see multiprocessing.get_start_method()) to create a child process, parent's physical memory is not copied and copy-on-writetechnique is used.
  3. Fork child process will still report full RSS (resident set size) of the parent process. Because of this fact, PSS(proportional set size) is more appropriate metric to estimate memory usage of forking application. Here's an example from the page:
  1. 在 Python 中(在下面我使用 Python 3.6.5 的 64 位版本),一切都是一个对象。这有它的开销,getsizeof我们可以准确地看到一个对象的大小(以字节为单位):

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. 当使用 fork 系统调用(*nix 上的默认值,参见multiprocessing.get_start_method())创建子进程时,不会复制父进程的物理内存并使用写时复制技术。
  3. Fork 子进程仍将报告父进程的完整 RSS(常驻集大小)。由于这个事实,PSS(比例集大小)是更合适的指标来估计分叉应用程序的内存使用情况。这是页面中的一个示例:
  • Process A has 50 KiB of unshared memory
  • Process B has 300 KiB of unshared memory
  • Both process A and process B have 100 KiB of the same shared memory region

Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:

  • PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
  • PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB
  • 进程 A 有 50 KiB 的非共享内存
  • 进程 B 有 300 KiB 的非共享内存
  • 进程 A 和进程 B 都有 100 KiB 的相同共享内存区域

由于 PSS 定义为一个进程的非共享内存和与其他进程共享的内存比例之和,因此这两个进程的 PSS 如下:

  • 进程 A 的 PSS = 50 KiB + (100 KiB / 2) = 100 KiB
  • 进程 B 的 PSS = 300 KiB + (100 KiB / 2) = 350 KiB

The data frame

数据框

Not let's look at your DataFramealone. memory_profilerwill help us.

不要让我们看你DataFrame一个人。memory_profiler会帮助我们。

justpd.py

只是pd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

Now let's use the profiler:

现在让我们使用分析器:

mprof run justpd.py
mprof plot

We can see the plot:

我们可以看到情节:

memory_profile

memory_profile

and line-by-line trace:

和逐行跟踪:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

We can see that the data frame takes ~2 GiB with peak at ~3 GiB while it's being built. What's more interesting is the output of info.

我们可以看到数据帧在构建时占用了 ~2 GiB,峰值为 ~3 GiB。更有趣的是info.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

But info(memory_usage='deep')("deep" means introspection of the data deeply by interrogating objectdtypes, see below) gives:

但是info(memory_usage='deep')(“深”意味着通过询问objectdtypes对数据进行深入内省,见下文)给出:

memory usage: 7.9 GB

Huh?! Looking outside of the process we can make sure that memory_profiler's figures are correct. sys.getsizeofalso shows the same value for the frame (most probably because of custom __sizeof__) and so will other tools that use it to estimate allocated gc.get_objects(), e.g. pympler.

嗯?!在流程之外,我们可以确保 memory_profiler的数字是正确的。sys.getsizeof还显示了相同的框架值(很可能是因为自定义__sizeof__),其他使用它来估计分配的​​工具也是如此gc.get_objects(),例如pympler.

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

Gives:

给出:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

So where do these 7.93 GiB come from? Let's try to explain this. We have 4M rows and 34 columns, which gives us 134M values. They are either int64or object(which is a 64-bit pointer; see using pandas with large datafor detailed explanation). Thus we have 134 * 10 ** 6 * 8 / 2 ** 20~1022 MiB only for values in the data frame. What about the remaining ~ 6.93 GiB?

那么这些 7.93 GiB 来自哪里呢?让我们试着解释一下。我们有 400 万行和 34 列,这给了我们 1.34 亿个值。它们或者int64object(这是一个64位指针;见采用Pandas具有大数据进行详细的解释)。因此,我们只有134 * 10 ** 6 * 8 / 2 ** 20大约 1022 MiB 的数据帧中的值。剩下的 ~ 6.93 GiB 呢?

String interning

字符串实习

To understand the behaviour it's necessary to know that Python does string interning. There are two good articles (one, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using internedfield of PyASCIIObject:

要理解这种行为,有必要知道 Python 会执行字符串实习。有两篇关于 Python 2 中字符串实习的好文章()。除了 Python 3 中的 Unicode 更改和Python 3.3 中的PEP 393 之外,C 结构也发生了变化,但思想是相同的。基本上,每个看起来像标识符的短字符串都将被 Python 缓存在内部字典中,并且引用将指向相同的 Python 对象。换句话说,我们可以说它的行为就像一个单身人士。我上面提到的文章解释了它提供了哪些重要的内存配置文件和性能改进。我们可以使用以下interned字段检查字符串是否被插入PyASCIIObject

import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

Then:

然后:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

With two strings we can also do identity comparison (addressed in memory comparison in case of CPython).

使用两个字符串,我们还可以进行身份​​比较(在 CPython 的情况下在内存比较中解决)。

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

Because of that fact, in regard to objectdtype, the data frame allocates at most 20 strings (one per amino acids). Though, it's worth noting that Pandas recommends categorical typesfor enumerations.

因此,对于objectdtype,数据框最多分配 20 个字符串(每个氨基酸一个)。不过,值得注意的是,Pandas 建议为枚举使用分类类型

Pandas memory

Pandas记忆

Thus we can explain the naive estimate of 7.93 GiB like:

因此,我们可以解释 7.93 GiB 的天真估计,如:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

Note that str_sizeis 58 bytes, not 50 as we've seen above for 1-character literal. It's because PEP 393 defines compact and non-compact strings. You can check it with sys.getsizeof(gen_matrix_df.REF[0]).

请注意,这str_size是 58 个字节,而不是我们在上面看到的 1 个字符文字的 50 个字节。这是因为 PEP 393 定义了紧缩和非紧缩字符串。您可以使用sys.getsizeof(gen_matrix_df.REF[0]).

Actual memory consumption should be ~1 GiB as it's reported by gen_matrix_df.info(), it's twice as much. We can assume it has something to do with memory (pre)allocation done by Pandas or NumPy. The following experiment shows that it's not without reason (multiple runs show the save picture):

实际内存消耗应该是 ~1 GiB,正如它所报告的那样gen_matrix_df.info(),是它的两倍。我们可以假设它与 Pandas 或 NumPy 完成的内存(预)分配有关。下面的实验表明,不是没有原因的(多次运行显示保存图片):

Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

I want to finish this section by a quote from fresh article about design issues and future Pandas2by original author of Pandas.

我想引用Pandas 原作者关于设计问题和未来 Pandas2新文章的引述来结束本节。

pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset

pandas 的经验法则:RAM 是数据集大小的 5 到 10 倍

Process tree

进程树

Let's come to the pool, finally, and see if can make use of copy-on-write. We'll use smemstat(available form an Ubuntu repository) to estimate process group memory sharing and glancesto write down system-wide free memory. Both can write JSON.

最后,让我们来到池中,看看是否可以使用写时复制。我们将使用smemstat(可从 Ubuntu 存储库中获得)来估计进程组内存共享并glances记下系统范围的可用内存。两者都可以写JSON。

We'll run original script with Pool(2). We'll need 3 terminal windows.

我们将使用Pool(2). 我们需要 3 个终端窗口。

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py
  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

Then mprof plotproduces:

然后mprof plot产生:

3 processes

3 processes

The sum chart (mprof run --nopython --include-children ./script.py) looks like:

总和图 ( mprof run --nopython --include-children ./script.py) 如下所示:

enter image description here

enter image description here

Note that two charts above show RSS. The hypothesis is that because of copy-on-write it's doesn't reflect actual memory usage. Now we have two JSON files from smemstatand glances. I'll the following script to covert the JSON files to CSV.

请注意,上面的两个图表显示了 RSS。假设是由于写时复制,它不反映实际的内存使用情况。现在我们有两个来自smemstat和 的JSON 文件glances。我将使用以下脚本将 JSON 文件转换为 CSV。

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

First let's look at freememory.

首先让我们看看free内存。

enter image description here

enter image description here

The difference between first and minimum is ~4.15 GiB. And here is how PSS figures look like:

第一个和最小值之间的差异是 ~4.15 GiB。这是 PSS 数字的样子:

enter image description here

enter image description here

And the sum:

和总和:

enter image description here

enter image description here

Thus we can see that because of copy-on-write actual memory consumption is ~4.15 GiB. But we're still serialising data to send it to worker processes via Pool.map. Can we leverage copy-on-write here as well?

因此我们可以看到,由于写时复制,实际内存消耗约为 4.15 GiB。但是我们仍在序列化数据以通过Pool.map. 我们也可以在这里利用写时复制吗?

Shared data

共享数据

To use copy-on-write we need to have the list(gen_matrix_df_list.values())be accessible globally so the worker after fork can still read it.

要使用写时复制,我们需要list(gen_matrix_df_list.values())全局可访问,以便 fork 之后的工作人员仍然可以读取它。

  1. Let's modify code after del gen_matrix_dfin mainlike the following:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. Remove del gen_matrix_df_listthat goes later.
  3. And modify first lines of matrix_to_vcflike:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    
  1. 让我们在del gen_matrix_dfin之后修改代码main,如下所示:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. del gen_matrix_df_list稍后删除。
  3. 并修改matrix_to_vcf像的第一行:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

Now let's re-run it. Free memory:

现在让我们重新运行它。空闲内存:

free

free

Process tree:

进程树:

process tree

process tree

And its sum:

它的总和:

sum

sum

Thus we're at maximum of ~2.9 GiB of actual memory usage (the peak main process has while building the data frame) and copy-on-write has helped!

因此,我们的实际内存使用量最多约为 2.9 GiB(主进程在构建数据帧时的峰值),写时复制有所帮助!

As a side note, there's so called copy-on-read, the behaviour of Python's reference cycle garbage collector, described in Instagram Engineering(which led to gc.freezein issue31558). But gc.disable()doesn't have an impact in this particular case.

作为旁注,有所谓的 copy-on-read,即 Python 的引用循环垃圾收集器的行为,在 Instagram 工程中进行了描述(导致gc.freezeissue31558 中)。但gc.disable()在这种特殊情况下没有影响。

Update

更新

An alternative to copy-on-write copy-less data sharing can be delegating it to the kernel from the beginning by using numpy.memmap. Here's an example implementationfrom High Performance Data Processing in Pythontalk. The tricky partis then to make Pandas to use the mmaped Numpy array.

写入时复制的无复制数据共享的替代方法是从一开始就使用numpy.memmap. 下面是一个示例实现高性能数据处理在Python谈话。在棘手的部分是那么做Pandas使用mmaped NumPy的阵列。

回答by tomas

When you use multiprocessing.Poola number of child processes will be created using the fork()system call. Each of those processes start off with an exact copy of the memory of the parent process at that time. Because you're loading the csv before you create the Poolof size 3, each of those 3 processes in the pool will unnecessarily have a copy of the data frame. (gen_matrix_dfas well as gen_matrix_df_listwill exist in the current process as well as in each of the 3 child processes, so 4 copies of each of these structures will be in memory)

当您使用multiprocessing.Pool多个子进程时,将使用fork()系统调用创建。这些进程中的每一个都以当时父进程内存的精确副本开始。因为您在创建Pool大小为 3的 csv 之前加载 csv ,所以池中的这 3 个进程中的每一个都将不必要地拥有数据框的副本。(gen_matrix_df以及gen_matrix_df_list将存在于当前进程以及 3 个子进程中的每一个中,因此这些结构中的每一个的 4 个副本将在内存中)

Try creating the Poolbefore loading the file (at the very beginning actually) That should reduce the memory usage.

尝试Pool在加载文件之前创建(实际上是在一开始)这应该会减少内存使用量。

If it's still too high, you can:

如果它仍然太高,您可以:

  1. Dump gen_matrix_df_list to a file, 1 item per line, e.g:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. Use Pool.imap()on an iterator over the lines that you dumped in this file, e.g.:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (Note that matrix_to_vcftakes a (key, value)tuple in the example above, not just a value)

  1. 将 gen_matrix_df_list 转储到文件中,每行 1 个项目,例如:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. Pool.imap()在您在此文件中转储的行上的迭代器上使用,例如:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (请注意,在上面的示例中matrix_to_vcf需要一个(key, value)元组,而不仅仅是一个值)

I hope that helps.

我希望这有帮助。

NB: I haven't tested the code above. It's only meant to demonstrate the idea.

注意:我还没有测试上面的代码。它只是为了证明这个想法。

回答by Abdulrahman Bres

I had the same issue. I needed to process a huge text corpus while keeping a knowledge base of few DataFrames of millions of rows loaded in memory. I think this issue is common so I will keep my answer oriented for general purposes.

我遇到过同样的问题。我需要处理一个巨大的文本语料库,同时保持内存中加载的数百万行的几个 DataFrame 的知识库。我认为这个问题很常见,所以我将保持我的回答面向一般目的。

A combinationof settings solved the problem for me (1 & 3 & 5 only might do it for you):

设置的组合为我解决了这个问题(1 & 3 & 5 可能只为你做):

  1. Use Pool.imap(or imap_unordered) instead of Pool.map. This will iterate over data lazily than loading all of it in memory before starting processing.

  2. Set a value to chunksizeparameter. This will make imapfaster too.

  3. Set a value to maxtasksperchildparameter.

  4. Append output to disk than in memory. Instantly or every while when it reaches a certain size.

  5. Run the code in different batches. You can use itertools.isliceif you have an iterator. The idea is to split your list(gen_matrix_df_list.values())to three or more lists, then you pass the first third only to mapor imap, then the second third in another run, etc. Since you have a list you can simply slice it in the same line of code.

  1. 使用Pool.imap(或imap_unordered)代替Pool.map。这将懒惰地迭代数据,而不是在开始处理之前将所有数据加载到内存中。

  2. chunksize参数设置一个值。这也会使imap速度更快。

  3. maxtasksperchild参数设置一个值。

  4. 将输出附加到磁盘而不是内存。当它达到一定大小时立即或每隔一段时间。

  5. 分批运行代码。如果您有迭代器,则可以使用itertools.islice。这个想法是将你的list(gen_matrix_df_list.values())列表分成三个或更多的列表,然后你只将前三分之一传递给mapor imap,然后在另一个运行中传递第二个三分之一,等等。因为你有一个列表,你可以简单地在同一行代码中将它切片。

回答by Jeff Ellen

GENERAL ANSWER ABOUT MEMORY WITH MULTIPROCESSING

关于多处理内存的一般性回答

You asked: "What is causing so much memory to be allocated". The answer relies on two parts.

您问:“是什么导致分配了如此多的内存”。答案取决于两个部分。

First, as you already noticed, each multiprocessingworker gets it's own copy of the data(quoted from here), so you should chunk large arguments. Or for large files, read them in a little bit at a time, if possible.

首先,正如您已经注意到的,每个multiprocessing工作人员都会获得自己的数据副本(引自此处),因此您应该将大参数分块。或者对于大文件,如果可能,一次一点点地阅读它们。

By default the workers of the pool are real Python processes forked using the multiprocessing module of the Python standard library when n_jobs != 1. The arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated n_jobs times by the workers.

默认情况下,当 n_jobs != 1 时,池的工作人员是使用 Python 标准库的多处理模块分叉的真正 Python 进程。作为输入传递给 Parallel 调用的参数被序列化并重新分配到每个工作进程的内存中。

这对于大参数来说可能是有问题的,因为它们会被工作人员重新分配 n_jobs 次。

Second, if you're trying to reclaim memory, you need to understand that python works differently than other languages, and you are relying on del to release the memory when it doesn't. I don't know if it's best, but in my own code, I've overcome this be reassigning the variable to a None or empty object.

其次,如果您尝试回收内存,您需要了解 python 的工作方式与其他语言不同,并且您依赖del 在它没有时释放内存。我不知道这是否最好,但在我自己的代码中,我克服了将变量重新分配给 None 或空对象的问题。

FOR YOUR SPECIFIC EXAMPLE - MINIMAL CODE EDITING

对于您的特定示例 - 最少的代码编辑

As long as you can fit your large data in memory twice, I think you can do what you are trying to do by just changing a single line. I've written very similar code and it worked for me when I reassigned the variable (vice call del or any kind of garbage collect). If this doesn't work, you may need to follow the suggestions above and use disk I/O:

只要您可以将大数据放入内存两次,我认为您只需更改一行就可以完成您想要做的事情。我编写了非常相似的代码,当我重新分配变量(副调用 del 或任何类型的垃圾收集)时,它对我有用。如果这不起作用,您可能需要遵循上述建议并使用磁盘 I/O:

    #### earlier code all the same
    # clear memory by reassignment (not del or gc)
    gen_matrix_df = {}

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed

    p.close()
    p.join()
    #### later code all the same

FOR YOUR SPECIFIC EXAMPLE - OPTIMAL MEMORY USAGE

对于您的特定示例 - 最佳内存使用

As long as you can fit your large data in memory once, and you have some idea of how big your file is, you can use Pandas read_csvpartial file reading, to read in only nrows at a timeif you really want to micro-manage how much data is being read in, or a [fixed amount of memory at a time using chunksize], which returns an iterator5. By that I mean, the nrows parameter is just a single read: you might use that to just get a peek at a file, or if for some reason you wanted each part to have exactly the same number of rows (because, for example, if any of your data is strings of variable length, each row will not take up the same amount of memory). But I think for the purposes of prepping a file for multiprocessing, it will be far easier to use chunks, because that directly relates to memory, which is your concern. It will be easier to use trial & error to fit into memory based on specific sized chunks than number of rows, which will change the amount of memory usage depending on how much data is in the rows. The only other difficult part is that for some application specific reason, you're grouping some rows, so it just makes it a little bit more complicated. Using your code as an example:

只要你能把你的大数据一次放入内存中,并且你对你的文件有多大有一定的了解,你可以使用Pandas read_csv部分文件读取,如果你真的想进行微观管理,一次只读取nrows有多少数据被读入,或者[一次使用块大小的固定内存量],它返回一个迭代器5. 我的意思是, nrows 参数只是一次读取:您可以使用它来查看文件,或者如果出于某种原因您希望每个部分具有完全相同的行数(因为,例如,如果您的任何数据是可变长度的字符串,则每一行将不会占用相同数量的内存)。但是我认为为了为多处理准备文件,使用块会容易得多,因为这与内存直接相关,这是您所关心的。根据特定大小的块比行数更容易使用试错法来适应内存,这将根据行中的数据量改变内存使用量。唯一的另一个困难部分是,由于某些特定于应用程序的原因,您正在对一些行进行分组,因此它只会使它变得更复杂一些。

   '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    #not sure why you need the ordered dict here, might add memory overhead
    #gen_matrix_df_list = collections.OrderedDict()  
    #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
    gen_matrix_df_list = collections.defaultdict(list)   
    chunksize = 10 ** 6

    for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
        # now, group the dataframe by chromosome/contig - so it can be multiprocessed
        gen_matrix_df = chunk.groupby('CHROM')
        for chr_, data in gen_matrix_df:
            gen_matrix_df_list[chr_].append(data)

    '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
    #The dict contains a list of small df objects, so now concatenate them
    #by reassigning to the same dict, the memory footprint is not increasing 
    for chr_ in gen_matrix_df_list.keys():
        gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    p.close()
    p.join()