Python 处理大文件的最快方法?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30294146/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 08:12:01  来源:igfitidea点击:

Fastest way to process a large file?

pythonfilepython-2.7filereader

提问by Reise45

I have multiple 3 GB tab delimited files. There are 20 million rows in each file. All the rows have to be independently processed, no relation between any two rows. My question is, what will be faster A. Reading line-by-line using:

我有多个 3 GB 制表符分隔的文件。每个文件中有 2000 万行。所有的行都必须独立处理,任何两行之间没有关系。我的问题是,什么会更快 A. 使用以下方法逐行阅读:

with open() as infile:
    for line in infile:

Or B. Reading the file into memory in chunks and processing it, say 250 MB at a time?

或者 B. 将文件分块读入内存并进行处理,比如一次 250 MB?

The processing is not very complicated, I am just grabbing value in column1 to List1, column2 to List2etc. Might need to add some column values together.

处理不是很复杂,我只是在 column1 to List1, column2 toList2等中抓取值。可能需要将一些列值添加在一起。

I am using python 2.7 on a linux box that has 30GB of memory. ASCII Text.

我在具有 30GB 内存的 linux 机器上使用 python 2.7。ASCII 文本。

Any way to speed things up in parallel? Right now I am using the former method and the process is very slow. Is using any CSVReadermodule going to help? I don't have to do it in python, any other language or database use ideas are welcome.

有什么方法可以并行加速吗?现在我使用的是前一种方法,过程很慢。使用任何CSVReader模块会有所帮助吗?我不必在 python 中做,欢迎任何其他语言或数据库使用想法。

采纳答案by abarnert

It sounds like your code is I/O bound. This means that multiprocessing isn't going to help—if you spend 90% of your time reading from disk, having an extra 7 processes waiting on the next read isn't going to help anything.

听起来您的代码受 I/O 限制。这意味着多处理无济于事——如果您花费 90% 的时间从磁盘读取,那么等待下一次读取的额外 7 个进程将无济于事。

And, while using a CSV reading module (whether the stdlib's csvor something like NumPy or Pandas) may be a good idea for simplicity, it's unlikely to make much difference in performance.

而且,虽然使用 CSV 读取模块(无论是 stdlibcsv还是 NumPy 或 Pandas 之类的东西)可能是一个简单的好主意,但它不太可能对性能产生太大影响。

Still, it's worth checking that you really areI/O bound, instead of just guessing. Run your program and see whether your CPU usage is close to 0% or close to 100% or a core. Do what Amadan suggested in a comment, and run your program with just passfor the processing and see whether that cuts off 5% of the time or 70%. You may even want to try comparing with a loop over os.openand os.read(1024*1024)or something and see if that's any faster.

尽管如此,还是值得检查一下您是否真的I/O 限制,而不仅仅是猜测。运行你的程序,看看你的 CPU 使用率是接近 0% 还是接近 100% 或一个核心。执行 Amadan 在评论中建议的操作,然后运行您的程序,仅pass用于处理,看看是否会减少 5% 或 70% 的时间。您甚至可能想尝试与循环比较os.openos.read(1024*1024)或其他东西,看看是否更快。



Since your using Python 2.x, Python is relying on the C stdio library to guess how much to buffer at a time, so it might be worth forcing it to buffer more. The simplest way to do that is to use readlines(bufsize)for some large bufsize. (You can try different numbers and measure them to see where the peak is. In my experience, usually anything from 64K-8MB is about the same, but depending on your system that may be different—especially if you're, e.g., reading off a network filesystem with great throughput but horrible latency that swamps the throughput-vs.-latency of the actual physical drive and the caching the OS does.)

由于您使用的是 Python 2.x,Python 依赖于 C stdio 库来猜测一次要缓冲多少,因此强制它缓冲更多可能是值得的。最简单的方法是使用readlines(bufsize)一些大的bufsize. (您可以尝试不同的数字并测量它们以查看峰值位置。根据我的经验,通常 64K-8MB 之间的任何内容都大致相同,但取决于您的系统,可能会有所不同 - 特别是如果您正在阅读关闭具有巨大吞吐量但可怕的延迟的网络文件系统,它淹没了实际物理驱动器的吞吐量与延迟以及操作系统所做的缓存。)

So, for example:

因此,例如:

bufsize = 65536
with open(path) as infile: 
    while True:
        lines = infile.readlines(bufsize)
        if not lines:
            break
        for line in lines:
            process(line)


Meanwhile, assuming you're on a 64-bit system, you may want to try using mmapinstead of reading the file in the first place. This certainly isn't guaranteedto be better, but it maybe better, depending on your system. For example:

同时,假设您使用的是 64 位系统,您可能想尝试使用mmap而不是首先读取文件。这当然不能保证更好,但可能会更好,具体取决于您的系统。例如:

with open(path) as infile:
    m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)

A Python mmapis sort of a weird object—it acts like a strand like a fileat the same time, so you can, e.g., manually iterate scanning for newlines, or you can call readlineon it as if it were a file. Both of those will take more processing from Python than iterating the file as lines or doing batch readlines(because a loop that would be in C is now in pure Python… although maybe you can get around that with re, or with a simple Cython extension?)… but the I/O advantage of the OS knowing what you're doing with the mapping may swamp the CPU disadvantage.

Pythonmmap是一种奇怪的对象——它同时像 astr和 afile一样工作,所以你可以,例如,手动迭代扫描换行符,或者你可以像调用readline一个文件一样调用它。与将文件作为行迭代或批处理相比,这两者都需要从 Python 中进行更多处理readlines(因为 C 中的循环现在是纯 Python 中的……尽管也许您可以使用re或简单的 Cython 扩展来解决这个问题?) ……但是操作系统的 I/O 优势知道你在用映射做什么可能会淹没 CPU 的劣势。

Unfortunately, Python doesn't expose the madvisecall that you'd use to tweak things in an attempt to optimize this in C (e.g., explicitly setting MADV_SEQUENTIALinstead of making the kernel guess, or forcing transparent huge pages)—but you can actually ctypesthe function out of libc.

不幸的是,Python 没有公开madvise您用来调整事物以尝试在 C 中优化它的调用(例如,显式设置MADV_SEQUENTIAL而不是让内核猜测,或强制透明大页面)—但您实际上可以使用ctypes该函数从libc.

回答by Deepak Saini

I know this question is old; but I wanted to do a similar thing, I created a simple framework which helps you read and process a large file in parallel. Leaving what I tried as an answer.

我知道这个问题很老;但我想做类似的事情,我创建了一个简单的框架,可以帮助您并行读取和处理大文件。留下我的尝试作为答案。

This is the code, I give an example in the end

这是代码,我最后举个例子

def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
    """
    function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned

    Params : 
        fname : path to the file to be chunked
        size : size of each chink is ~> this
        skiplines : number of lines in the begining to skip, -1 means don't skip any lines
    Returns : 
        start and end position of chunks in Bytes
    """
    chunks = []
    fileEnd = os.path.getsize(fname)
    with open(fname, "rb") as f:
        if(skiplines > 0):
            for i in range(skiplines):
                f.readline()

        chunkEnd = f.tell()
        count = 0
        while True:
            chunkStart = chunkEnd
            f.seek(f.tell() + size, os.SEEK_SET)
            f.readline()  # make this chunk line aligned
            chunkEnd = f.tell()
            chunks.append((chunkStart, chunkEnd - chunkStart, fname))
            count+=1

            if chunkEnd > fileEnd:
                break
    return chunks

def parallel_apply_line_by_line_chunk(chunk_data):
    """
    function to apply a function to each line in a chunk

    Params :
        chunk_data : the data for this chunk 
    Returns :
        list of the non-None results for this chunk
    """
    chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
    func_args = chunk_data[4:]

    t1 = time.time()
    chunk_res = []
    with open(file_path, "rb") as f:
        f.seek(chunk_start)
        cont = f.read(chunk_size).decode(encoding='utf-8')
        lines = cont.splitlines()

        for i,line in enumerate(lines):
            ret = func_apply(line, *func_args)
            if(ret != None):
                chunk_res.append(ret)
    return chunk_res

def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
    """
    function to apply a supplied function line by line in parallel

    Params :
        input_file_path : path to input file
        chunk_size_factor : size of 1 chunk in MB
        num_procs : number of parallel processes to spawn, max used is num of available cores - 1
        skiplines : number of top lines to skip while processing
        func_apply : a function which expects a line and outputs None for lines we don't want processed
        func_args : arguments to function func_apply
        fout : do we want to output the processed lines to a file
    Returns :
        list of the non-None results obtained be processing each line
    """
    num_parallel = min(num_procs, psutil.cpu_count()) - 1

    jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)

    jobs = [list(x) + [func_apply] + func_args for x in jobs]

    print("Starting the parallel pool for {} jobs ".format(len(jobs)))

    lines_counter = 0

    pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering

    outputs = []
    for i in range(0, len(jobs), num_parallel):
        print("Chunk start = ", i)
        t1 = time.time()
        chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])

        for i, subl in enumerate(chunk_outputs):
            for x in subl:
                if(fout != None):
                    print(x, file=fout)
                else:
                    outputs.append(x)
                lines_counter += 1
        del(chunk_outputs)
        gc.collect()
        print("All Done in time ", time.time() - t1)

    print("Total lines we have = {}".format(lines_counter))

    pool.close()
    pool.terminate()
    return outputs

Say for example, I have a file in which I want to count the number of words in each line, then the processing of each line would look like

比如说,我有一个文件,我想在其中计算每行中的单词数,然后每行的处理看起来像

def count_words_line(line):
    return len(line.strip().split())

and then call the function like:

然后调用函数,如:

parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)

Using this, I get a speed up of ~8 times as compared to vanilla line by line reading on a sample file of size ~20GB in which I do some moderately complicated processing on each line.

使用它,与在大小为 ~20GB 的样本文件上逐行读取相比,我的速度提高了 ~8 倍,其中我对每一行进行了一些中等复杂的处理。