Python - 如何使用 asyncio 同时运行多个协程?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32054066/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:55:40  来源:igfitidea点击:

Python - how to run multiple coroutines concurrently using asyncio?

pythonpython-3.xwebsocketpython-asyncio

提问by weatherfrog

I'm using the websocketslibrary to create a websocket server in Python 3.4. Here's a simple echo server:

我正在使用该websockets库在 Python 3.4 中创建一个 websocket 服务器。这是一个简单的回显服务器:

import asyncio
import websockets

@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that? I mean, because connection_handleris constantly waiting for incoming messages, the server can only take action afterit has received a message from the client, right? What am I missing here?

假设我们 - 另外 - 想要在发生某些事件时向客户端发送消息。为简单起见,让我们每隔 60 秒定期发送一条消息。我们怎么做?我的意思是,因为connection_handler不断等待传入的消息,服务器只能收到来自客户端的消息才能采取行动,对吗?我在这里缺少什么?

Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

也许这个场景需要一个基于事件/回调的框架而不是一个基于协程的框架?龙卷风

采纳答案by Yaroslav Admin

TL;DRUse asyncio.ensure_future()to run several coroutines concurrently.

TL;DR用于asyncio.ensure_future()同时运行多个协程。



Maybe this scenario requires a framework based on events/callbacks rather than one based on coroutines? Tornado?

也许这个场景需要一个基于事件/回调的框架而不是一个基于协程的框架?龙卷风?

No, you don't need any other framework for this. The whole idea the asynchronous application vs synchronous is that it doesn't block, while waiting for result. It doesn't matter how it is implemented, using coroutines or callbacks.

不,您不需要任何其他框架。异步应用程序与同步应用程序的整个想法是它在等待结果时不会阻塞。它是如何实现的,使用协程或回调,并不重要。

I mean, because connection_handler is constantly waiting for incoming messages, the server can only take action after it has received a message from the client, right? What am I missing here?

我的意思是,因为 connection_handler 一直在等待传入的消息,所以服务器只能在收到来自客户端的消息后才能采取行动,对吗?我在这里缺少什么?

In synchronous application you will write something like msg = websocket.recv(), which would block whole application until you receive message (as you described). But in the asynchronous application it's completely different.

在同步应用程序中,您将编写类似msg = websocket.recv(),它会阻止整个应用程序,直到您收到消息(如您​​所描述的)。但在异步应用程序中则完全不同。

When you do msg = yield from websocket.recv()you say something like: suspend execution of connection_handler()until websocket.recv()will produce something. Using yield frominside coroutine returns control back to the event loop, so some other code can be executed, while we're waiting for result of websocket.recv(). Please, refer to documentationto better understand how coroutines work.

当你这样做时,msg = yield from websocket.recv()你会说:暂停执行connection_handler()直到websocket.recv()会产生一些东西。使用yield from内部协程将控制权返回给事件循环,因此可以执行其他一些代码,而我们正在等待websocket.recv(). 请参阅文档以更好地了解协程的工作原理。

Let's say we – additionally – wanted to send a message to the client whenever some event happens. For simplicity, let's send a message periodically every 60 seconds. How would we do that?

假设我们 - 另外 - 想要在发生某些事件时向客户端发送消息。为简单起见,让我们每隔 60 秒定期发送一条消息。我们怎么做?

You can use asyncio.async()to run as many coroutines as you want, before executing blocking call for starting event loop.

asyncio.async()在执行阻塞调用以启动事件循环之前,您可以根据需要运行任意数量的协程。

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []


@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< {}'.format(msg))
        yield from connection.send(msg)
        print('> {}'.format(msg))


@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client


start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

Here is an example client implementation. It asks you to enter name, receives it back from the echo server, waits for two more messages from server (which are our periodic messages) and closes connection.

这是一个示例客户端实现。它要求您输入名称,从回显服务器接收它,等待来自服务器的另外两条消息(这是我们的定期消息)并关闭连接。

import asyncio

import websockets


@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What's your name? ")
    yield from connection.send(name)
    print("> {}".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< {}".format(msg))

    yield from connection.close()


asyncio.get_event_loop().run_until_complete(hello())

Important points:

要点:

  1. In Python 3.4.4 asyncio.async()was renamed to asyncio.ensure_future().
  2. There are special methods for scheduling delayed calls, but they don't work with coroutines.
  1. 在 Python 3.4.4 中asyncio.async()被重命名为asyncio.ensure_future().
  2. 有一些特殊的方法可以安排延迟调用,但它们不适用于协程。

回答by gzerone

Same issue, can hardly got solution until I saw the perfect sample here: http://websockets.readthedocs.io/en/stable/intro.html#both

同样的问题,在我在这里看到完美的示例之前很难找到解决方案:http: //websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

So, I can handle multi coroutine tasks such as heartbeat and redis subscribe.

所以,我可以处理心跳和redis订阅等多协程任务。

回答by Cyril N.

I'm surprised gatherisn't mentioned.

我很惊讶gather没有提到。

From the Python documentation:

Python 文档

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24