从 Flask 路由进行 Python 异步调用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/47841985/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Make a Python asyncio call from a Flask route
提问by user24502
I want to execute an async function every time the Flask route is executed. Why is the abar
function never executed?
每次执行 Flask 路由时,我都想执行一个异步函数。为什么abar
函数永远不会执行?
import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=loop)
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
loop.run_forever()
I also tried putting the blocking call in a separate thread. But it is still not calling the abar
function.
我还尝试将阻塞调用放在一个单独的线程中。但它仍然没有调用该abar
函数。
import asyncio
from threading import Thread
from flask import Flask
async def abar(a):
print(a)
app = Flask(__name__)
def start_worker(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
return "OK"
if __name__ == "__main__":
worker.start()
app.run(debug=False, use_reloader=False)
回答by Travis Terry
You can incorporate some async functionality into Flask apps without having to completely convert them to asyncio.
您可以将一些异步功能合并到 Flask 应用程序中,而无需将它们完全转换为 asyncio。
import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
loop.run_until_complete(abar("abar"))
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
This will block the Flask response until the async function returns, but it still allows you to do some clever things. I've used this pattern to perform many external requests in parallel using aiohttp, and then when they are complete, I'm back into traditional flask for data processing and template rendering.
这将阻止 Flask 响应,直到 async 函数返回,但它仍然允许您做一些聪明的事情。我已经使用这种模式使用aiohttp并行执行许多外部请求,然后当它们完成时,我又回到传统的烧瓶中进行数据处理和模板渲染。
import aiohttp
import asyncio
import async_timeout
from flask import Flask
loop = asyncio.get_event_loop()
app = Flask(__name__)
async def fetch(url):
async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
def fight(responses):
return "Why can't we all just get along?"
@app.route("/")
def index():
# perform multiple async requests concurrently
responses = loop.run_until_complete(asyncio.gather(
fetch("https://google.com/"),
fetch("https://bing.com/"),
fetch("https://duckduckgo.com"),
fetch("http://www.dogpile.com"),
))
# do something with the results
return fight(responses)
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
回答by pgjones
A simpler solution to your problem (in my biased view) is to switch to Quartfrom Flask. If so your snippet simplifies to,
您的问题的一个更简单的解决方案(在我的偏见中)是从 Flask切换到Quart。如果是这样,您的代码段将简化为,
import asyncio
from quart import Quart
async def abar(a):
print(a)
app = Quart(__name__)
@app.route("/")
async def notify():
await abar("abar")
return "OK"
if __name__ == "__main__":
app.run(debug=False)
As noted in the other answers the Flask app run is blocking, and does not interact with an asyncio loop. Quart on the other hand is the Flask API built on asyncio, so it should work how you expect.
正如其他答案中所述,Flask 应用程序运行被阻塞,并且不与 asyncio 循环交互。另一方面,Quart 是基于 asyncio 构建的 Flask API,因此它应该按您的预期工作。
Also as an update, Flask-Aiohttp is no longer maintained.
同样作为更新,Flask-Aiohttp 不再维护。
回答by Martijn Pieters
Your mistake is to try to run the asyncio event loop after calling app.run()
. The latter doesn't return, it instead runs the Flask development server.
你的错误是在调用app.run()
. 后者不会返回,而是运行 Flask 开发服务器。
In fact, that's how most WSGI setups will work; either the main thread is going to busy dispatching requests, or the Flask server is imported as a module in a WSGI server, and you can't start an event loop here either.
事实上,这就是大多数 WSGI 设置的工作方式;无论是主线程是要忙调度请求,或瓶服务器导入为一个WSGI服务器的模块,并且你不能从这里开始的事件循环两种。
You'll instead have to run your asyncio event loop in a separate thread, then run your coroutines in that separate thread via asyncio.run_coroutine_threadsafe()
. See the Coroutines and Multithreadingsectionin the documentation for what this entails.
相反,您必须在单独的线程中运行 asyncio 事件循环,然后通过asyncio.run_coroutine_threadsafe()
. 请参阅文档中的协程和多线程部分了解这意味着什么。
Here is an implementation of a module that will run such an event loop thread, and gives you the utilities to schedule coroutines to be run in that loop:
这是一个模块的实现,该模块将运行这样一个事件循环线程,并为您提供安排协程在该循环中运行的实用程序:
import asyncio
import itertools
import time
import threading
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
You can use the run_coroutine()
function defined here to schedule asyncio routines. Use the returned Future
instanceto control the coroutine:
您可以使用run_coroutine()
此处定义的函数来安排异步例程。使用返回的Future
实例来控制协程:
- Get the result with
Future.result()
. You can give this a timeout; if no result is produced within the timeout, the coroutine is automatically cancelled. - You can query the state of the coroutine with the
.cancelled()
,.running()
and.done()
methods. - You can add callbacks to the future, which will be called when the coroutine has completed, or is cancelled or raised an exception (take into account that this is probably going to be called from the event loop thread, not the thread that you called
run_coroutine()
in).
- 得到结果
Future.result()
。你可以给它一个超时时间;如果在超时时间内没有产生任何结果,协程将自动取消。 - 您可以使用
.cancelled()
,.running()
和.done()
方法查询协程的状态。 - 您可以向未来添加回调,它将在协程完成、取消或引发异常时调用(考虑到这可能会从事件循环线程调用,而不是您调用的线程
run_coroutine()
) )。
For your specific example, where abar()
doesn't return any result, you can just ignore the returned future, like this:
对于您的特定示例, whereabar()
不返回任何结果,您可以忽略返回的未来,如下所示:
@app.route("/")
def notify():
run_coroutine(abar("abar"))
return "OK"
Note that before Python 3.8that you can't use an event loop running on a separate thread to create subprocesses with! See my answer to Python3 Flask asyncio subprocess in route hangsfor backport of the Python 3.8 ThreadedChildWatcher
class for a work-around for this.
请注意,在 Python 3.8 之前,您不能使用在单独线程上运行的事件循环来创建子进程!请参阅我对Python3 Flask asyncio subprocess in route hangsfor backport of the Python 3.8 ThreadedChildWatcher
class 的回答,以解决此问题。
回答by Mikhail Gerasimov
For same reason you won't see this print:
出于同样的原因,您将看不到此打印件:
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
print('Hey!')
loop.run_forever()
loop.run_forever()
is never called since as @dirn already noted app.run
is also blocking.
loop.run_forever()
永远不会被调用,因为@dirn 已经注意到app.run
也是阻塞的。
Running global blocking event loop - is only way you can run asyncio
coroutines and tasks, but it's not compatible with running blocking Flask app (or with any other such thing in general).
运行全局阻塞事件循环 - 是运行asyncio
协程和任务的唯一方式,但它与运行阻塞 Flask 应用程序(或任何其他类似的东西)不兼容。
If you want to use asynchronous web framework you should choose one created to be asynchronous. For example, probably most popular now is aiohttp:
如果您想使用异步 Web 框架,您应该选择一个创建为异步的框架。例如,现在最流行的可能是aiohttp:
from aiohttp import web
async def hello(request):
return web.Response(text="Hello, world")
if __name__ == "__main__":
app = web.Application()
app.router.add_get('/', hello)
web.run_app(app) # this runs asyncio event loop inside
Upd:
更新:
About your try to run event loop in background thread. I didn't investigate much, but it seems problem somehow related with tread-safety: many asyncio objects are not thread-safe. If you change your code this way, it'll work:
关于您尝试在后台线程中运行事件循环。我没有进行太多调查,但似乎问题与胎面安全有关:许多 asyncio 对象不是线程安全的。如果您以这种方式更改代码,它将起作用:
def _create_task():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
@app.route("/")
def notify():
worker_loop.call_soon_threadsafe(_create_task)
return "OK"
But again, this is very bad idea. It's not only very inconvenient, but I guess wouldn't make much sense: if you're going to use thread to start asyncio, why don't just use threads in Flaskinstead of asyncio? You will have Flask you want and parallelization.
但同样,这是一个非常糟糕的主意。这不仅非常不方便,而且我想也没有多大意义:如果您要使用线程来启动 asyncio,为什么不在Flask 中使用线程而不是 asyncio?你将拥有你想要的 Flask 和并行化。
If I still didn't convince you, at least take a look at Flask-aiohttpproject. It has close to Flask api and I think still better that what you're trying to do.
如果我还是没说服你,至少看看Flask-aiohttp项目。它接近 Flask api,我认为你要做的更好。
回答by Tomer
Thanks for JL Diaz ( From RealPython ) for providing a working code for the above that was not working.
感谢 JL Diaz(来自 RealPython)为上述无法工作的代码提供了工作代码。
If anything here should be changed, feel free to comment.
如果这里有什么应该改变的,请随时发表评论。
import aiohttp
import asyncio
import async_timeout
from quart import Quart, jsonify
app = Quart(__name__)
async def fetch(url):
async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
def fight(responses):
return jsonify([len(r) for r in responses])
@app.route("/")
async def index():
# perform multiple async requests concurrently
responses = await asyncio.gather(
fetch("https://google.com/"),
fetch("https://bing.com/"),
fetch("https://duckduckgo.com"),
fetch("http://www.dogpile.com"),
)
# do something with the results
return fight(responses)
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)