Python 在 Flask 中创建异步任务

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31866796/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:41:24  来源:igfitidea点击:

Making an asynchronous task in Flask

pythonasynchronousflask

提问by Darwin Tech

I am writing an application in Flask, which works really well except that WSGIis synchronous and blocking. I have one task in particular which calls out to a third party API and that task can take several minutes to complete. I would like to make that call (it's actually a series of calls) and let it run. while control is returned to Flask.

我正在用 Flask 编写一个应用程序,除了WSGI同步和阻塞之外,它运行得非常好。我有一项特别需要调用第三方 API 的任务,该任务可能需要几分钟才能完成。我想打那个电话(实际上是一系列电话)并让它运行。而控制权返回给 Flask。

My view looks like:

我的观点是这样的:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Now, what I want to do is have the line

现在,我想要做的就是拥有这条线

final_file = audio_class.render_audio()

run and provide a callback to be executed when the method returns, whilst Flask can continue to process requests. This is the only task which I need Flask to run asynchronously, and I would like some advice on how best to implement this.

运行并提供在方法返回时执行的回调,而 Flask 可以继续处理请求。这是我需要 Flask 异步运行的唯一任务,我想就如何最好地实现这一点提供一些建议。

I have looked at Twisted and Klein, but I'm not sure they are overkill, as maybe Threading would suffice. Or maybe Celery is a good choice for this?

我看过 Twisted 和 Klein,但我不确定它们是否矫枉过正,因为也许线程就足够了。或者芹菜是一个不错的选择?

采纳答案by Connie

I would use Celeryto handle the asynchronous task for you. You'll need to install a broker to serve as your task queue (RabbitMQ and Redis are recommended).

我会用Celery来为你处理异步任务。您需要安装一个代理作为您的任务队列(推荐使用 RabbitMQ 和 Redis)。

app.py:

app.py

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Run your Flask app, and start another process to run your celery worker.

运行您的 Flask 应用程序,并启动另一个进程来运行您的 celery worker。

$ celery worker -A app.celery --loglevel=debug

I would also refer to Miguel Gringberg's write upfor a more in depth guide to using Celery with Flask.

我还会参考 Miguel Gringberg 的文章以获取有关在 Flask 中使用 Celery 的更深入指南。

回答by Jurgen Strydom

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

线程是另一种可能的解决方案。尽管基于 Celery 的解决方案更适合大规模应用程序,但如果您不希望相关端点上有太多流量,线程是一个可行的替代方案。

This solution is based on Miguel Grinberg's PyCon 2016 Flask at Scale presentation, specifically slide 41in his slide deck. His code is also available on githubfor those interested in the original source.

该解决方案基于Miguel Grinberg 的 PyCon 2016 Flask at Scale 演示,特别是他幻灯片中的幻灯片 41。他的代码也可以在 github 上找到,供对原始源代码感兴趣的人使用。

From a user perspective the code works as follows:

从用户的角度来看,代码的工作原理如下:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.
  1. 您调用执行长时间运行任务的端点。
  2. 此端点返回 202 Accepted 并带有链接以检查任务状态。
  3. 当 taks 仍在运行时,对状态链接的调用将返回 202,并在任务完成时返回 200(和结果)。

To convert an api call to a background task, simply add the @async_api decorator.

要将 api 调用转换为后台任务,只需添加 @async_api 装饰器。

Here is a fully contained example:

这是一个完全包含的示例:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

回答by Tomasz Bartkowiak

You can also try using multiprocessing.Processwith deamon=True; the process.start()method does not block and you can return a response/status immediately to the caller while your expensive function executes in the background.

您也可以尝试使用multiprocessing.Processwith deamon=True; 该process.start()方法不会阻塞,您可以在昂贵的函数在后台执行时立即向调用者返回响应/状态。

I experienced similar problem while working with falconframework and using deamonprocess helped.

我在使用falcon框架和使用deamon过程帮助时遇到了类似的问题。

You'd need to do the following:

您需要执行以下操作:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

You should get a response immediately and, after 10s you should see a printed message in the console.

您应该立即得到响应,10 秒后您应该会在控制台中看到打印的消息。

NOTE: Keep in mind that daemonicprocesses are not allowed to spawn any child processes.

注意:请记住,daemonic不允许进程产生任何子进程。