Python 使用 asyncio 逐行读取文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33824359/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 14:03:29  来源:igfitidea点击:

Read file line by line with asyncio

pythonpython-asyncio

提问by josteinb

I wish to read several log files as they are written and process their input with asyncio. The code will have to run on windows. From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (selectwill not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like. The most helpful answer would probably be one that describes what the "architecture" of a solution to this problem should look like, i.e. how different functions and coroutines should be called or scheduled.

我希望在写入时读取几个日志文件并使用 asyncio 处理它们的输入。代码必须在 Windows 上运行。根据我在 stackoverflow 和 Web 上搜索的理解,异步文件 I/O 在大多数操作系统上都很棘手(select例如,不会按预期工作)。虽然我确信我可以用其他方法(例如线程)来做到这一点,但我还是会尝试使用 asyncio 来看看它是什么样的。最有用的答案可能是描述该问题解决方案的“架构”应该是什么样子的答案,即应该如何调用或调度不同的函数和协程。

The following gives me a generator that reads the files line by line (through polling, which is acceptable):

以下为我提供了一个逐行读取文件的生成器(通过轮询,这是可以接受的):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

With several files to monitor and process, this sort of code would require threads. I have modified it slightly to be more usable with asyncio:

由于需要监视和处理多个文件,因此此类代码需要线程。我稍微修改了它,以便更适用于 asyncio:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

This sort of works when I schedule it through the asyncio event loop, but if process_datablocks, then that is of course not good. When starting out, I imagined the solution would look something like

当我通过 asyncio 事件循环安排它时,这种工作是有效的,但是如果process_data阻塞,那当然不好。开始时,我想象解决方案看起来像

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

but I could not figure out how to make that work (at least not without process_datamanaging quite a bit of state).

但我不知道如何使这项工作(至少在没有process_data管理相当多的状态的情况下)。

Any ideas on how I should structure this kind of code?

关于我应该如何构建这种代码的任何想法?

采纳答案by Jashandeep Sohi

From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.

根据我在 stackoverflow 和 Web 上搜索的理解,异步文件 I/O 在大多数操作系统上都很棘手(例如,select 将无法按预期工作)。虽然我确信我可以用其他方法(例如线程)来做到这一点,但我还是会尝试使用 asyncio 来看看它是什么样的。

asyncioisselectbased on *nix systems under the hood, so you won't be able to do non-blocking file I/O without the use of threads. On Windows, asynciocan use IOCP, which supports non-blocking file I/O, but this isn't supported by asyncio.

asyncioselect基于引擎盖下* nix系统,所以你不会是能够做到无阻塞文件I / O,而无需使用线程。在 Windows 上,asyncio可以使用IOCP,它支持非阻塞文件 I/O,但asyncio.

Your code is fine, except you should do blocking I/O calls in threads, so that you don't block the event loop if the I/O is slow. Fortunately, it's really simple to off load work to threads using the loop.run_in_executorfunction.

你的代码很好,除了你应该在线程中阻塞 I/O 调用,这样如果 I/O 很慢,你就不会阻塞事件循环。幸运的是,使用该loop.run_in_executor函数将工作卸载给线程非常简单。

First, setup a dedicated thread-pool for your I/O:

首先,为您的 I/O 设置一个专用线程池:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()

And then simply offload any blocking I/O calls to the executor:

然后简单地将任何阻塞 I/O 调用卸载到执行程序:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...

回答by Andrew Svetlov

asynciodoesn't support file operations yet, sorry.

asyncio尚不支持文件操作,抱歉。

Thus it cannot help with your problem.

因此它无法帮助您解决问题。

回答by Vincent

Your code structure looks good to me, the following code runs fine on my machine:

你的代码结构对我来说很好,下面的代码在我的机器上运行良好:

import asyncio

PERIOD = 0.5

@asyncio.coroutine
def readline(f):
    while True:
        data = f.readline()
        if data:
            return data
        yield from asyncio.sleep(PERIOD)

@asyncio.coroutine
def test():
    with open('test.txt') as f:
        while True:
            line = yield from readline(f)
            print('Got: {!r}'.format(line))

loop = asyncio.get_event_loop()
loop.run_until_complete(test())

回答by pylover

Using the aiofiles:

使用aiofiles

async with aiofiles.open('filename', mode='r') as f:
    async for line in f:
        print(line)

EDIT 1

编辑 1

As the @Jashandeep mentioned, you should care about blocking operations:

正如@Jashandeep 提到的,你应该关心阻塞操作:

Another method is selectand or epoll:

另一种方法是selectand 或epoll

from select import select

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)

The timeoutparameter is important here.

timeout参数是很重要的位置。

see: https://docs.python.org/3/library/select.html#select.select

见:https: //docs.python.org/3/library/select.html#select.select

EDIT 2

编辑 2

You can register a file for read/write with: loop.add_reader()

您可以使用以下命令注册读/写文件:loop.add_reader()

It uses internal EPOLL Handler inside the loop.

它在循环内使用内部 EPOLL 处理程序。

EDIT 3

编辑 3

But remember the Epoll will not work with regular files.

但请记住,Epoll 不适用于常规文件。