如何使用 python Tornado 服务器在请求中最好地执行多处理?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15375336/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 19:57:55  来源:igfitidea点击:

How to best perform Multiprocessing within requests with the python Tornado server?

pythonmultiprocessingtornadopython-multithreading

提问by Rocketman

I am using the I/O non-blocking python server Tornado. I have a class of GETrequests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.

我正在使用 I/O 非阻塞 python 服务器 Tornado。我有一类GET请求可能需要很长时间才能完成(想想在 5-10 秒的范围内)。问题是 Tornado 会阻塞这些请求,因此后续的快速请求会被阻止,直到慢速请求完成。

I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrencyand came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).

我查看了:https: //github.com/facebook/tornado/wiki/Threading-and-concurrency并得出结论,我想要#3(其他进程)和#4(其他线程)的某种组合。#4 本身有问题,当有另一个线程执行“heavy_lifting”时,我无法可靠地控制回 ioloop。(我认为这是由于 GIL 以及heavy_lifting 任务具有高 CPU 负载并不断将控制权从主 ioloop 中拉出这一事实,但那是猜测)。

So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GETrequests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.

所以我一直在原型设计如何通过GET在单独的进程中在这些慢速请求中执行“繁重”任务来解决这个问题,然后在进程完成以完成请求时将回调放回 Tornado ioloop。这释放了 ioloop 来处理其他请求。

I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.

我创建了一个简单的示例来演示可能的解决方案,但很想从社区获得有关它的反馈。

My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?

我的问题有两个:如何简化当前的方法?它可能存在哪些陷阱?

The Approach

该方法

  1. Utilize Tornado's builtin asynchronousdecorator which allows a request to stay open and for the ioloop to continue.

  2. Spawn a separate process for "heavy lifting" tasks using python's multiprocessingmodule. I first attempted to use the threadingmodule but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessingwould also take advantage of multicores.

  3. Start a 'watcher' thread in the main ioloop process using the threadingmodule who's job it is to watch a multiprocessing.Queuefor the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.

  4. Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0)calls so that other requests continue to get readily processed.

  5. When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback()which is documented to be the only safe way to call ioloop instances from other threads.

  6. Be sure to then call finish()in the callback to complete the request and hand over a reply.

  1. 利用 Tornado 的内置asynchronous装饰器,它允许请求保持打开状态并让 ioloop 继续。

  2. 使用 python 的multiprocessing模块为“繁重”任务生成一个单独的进程。我首先尝试使用该threading模块,但无法将任何可靠的控制权交还给 ioloop。看来这mutliprocessing也将利用多核。

  3. 使用threading模块在主 ioloop 进程中启动一个“观察者”线程,该模块的工作是multiprocessing.Queue在“繁重”任务完成时观察它的结果。这是必需的,因为我需要一种方法来知道heavy_lifting 任务已经完成,同时仍然能够通知ioloop 这个请求现在已经完成。

  4. 确保“观察者”线程经常通过time.sleep(0)调用将控制权交给主 ioloop 循环,以便其他请求继续得到轻松处理。

  5. 当队列中有结果时,然后从“观察者”线程添加一个回调,使用tornado.ioloop.IOLoop.instance().add_callback()它被记录为从其他线程调用 ioloop 实例的唯一安全方法。

  6. 一定要finish()在回调中调用以完成请求并移交回复。

Below is some sample code showing this approach. multi_tornado.pyis the server implementing the above outline and call_multi.pyis a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GETrequests followed by 20 fast GETrequests. The results are shown for both running with and without the threading turned on.

下面是一些展示这种方法的示例代码。 multi_tornado.py是实现上述大纲的服务器,call_multi.py是一个示例脚本,它以两种不同的方式调用服务器来测试服务器。两个测试都使用 3 个慢速GET请求和 20 个快速GET请求调用服务器。显示了在打开和未打开线程的情况下运行的结果。

In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.

在“无线程”的情况下运行它的情况下,3 个缓慢的请求会阻塞(每个需要一秒多的时间才能完成)。20 个快速请求中的一些在 ioloop 中的一些慢速请求之间挤压(不完全确定这是如何发生的 - 但可能是我在同一台机器上同时运行服务器和客户端测试脚本的工件)。这里的重点是所有的快速请求都在不同程度上被阻止。

In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.

在启用线程的情况下运行它时,20 个快速请求首先立即全部完成,而三个慢速请求随后几乎同时完成,因为它们每个都并行运行。这是所需的行为。三个慢速请求并行完成需要 2.5 秒 - 而在非线程情况下,三个慢速请求总共需要大约 3.5 秒。所以总体上有大约 35% 的加速(我假设是由于多核共享)。但更重要的是 - 快速请求立即以慢速请求的 leu 处理。

I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:

我在多线程编程方面没有太多经验 - 所以虽然这在这里似乎有效,但我很想学习:

Is there a simpler way to accomplish this? What monster's may lurk within this approach?

有没有更简单的方法来实现这一点?这种方法中可能潜伏着什么怪物?

(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)

(注意:未来的权衡可能是使用反向代理运行更多 Tornado 实例,例如 nginx 进行负载平衡。无论如何我将使用负载平衡器运行多个实例 - 但我担心只是在这个问题上抛出硬件因为在阻塞方面,硬件似乎与问题直接相关。)

Sample Code

示例代码

multi_tornado.py(sample server):

multi_tornado.py(示例服务器):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py(client tester):

call_multi.py(客户测试员):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

Test Results

检测结果

By running python call_multi.py slow(the blocking behavior):

通过运行python call_multi.py slow(阻塞行为):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

By running python call_multi.py slow_threaded(the desired behavior):

通过运行python call_multi.py slow_threaded(所需的行为):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s

回答by andy boot

If your get requests are taking that long then tornado is the wrong framework.

如果您的获取请求需要那么长时间,那么龙卷风就是错误的框架。

I suggest you use nginx to route the fast gets to tornado and the slower ones to a different server.

我建议您使用 nginx 将快速访问路由到龙卷风,将较慢访问路由到不同的服务器。

PeterBe has an interesting article where he runs multiple Tornado servers and sets one of them to be 'the slow one' for handling the long running requests see: worrying-about-io-blockingI would try this method.

PeterBe 有一篇有趣的文章,他运行多个 Tornado 服务器并将其中一个设置为“慢速服务器”以处理长时间运行的请求,请参阅:担心 io-blocking我会尝试这种方法。

回答by dano

multiprocessing.Poolcan be integrated into the tornadoI/O loop, but it's a bit messy. A much cleaner integration can be done using concurrent.futures(see my other answerfor details), but if you're stuck on Python 2.x and can't install the concurrent.futuresbackport, here is how you can do it strictly using multiprocessing:

multiprocessing.Pool可以集成到tornadoI/O循环中,但是有点乱。可以使用更简洁的集成concurrent.futures(有关详细信息,请参阅我的其他答案),但是如果您坚持使用 Python 2.x 并且无法安装concurrent.futuresbackport,那么您可以通过multiprocessing以下方式严格使用:

The multiprocessing.Pool.apply_asyncand multiprocessing.Pool.map_asyncmethods both have an optional callbackparameter, which means that both can potentially be plugged into a tornado.gen.Task. So in most cases, running code asynchronously in a sub-process is as simple as this:

multiprocessing.Pool.apply_asyncmultiprocessing.Pool.map_async方法都具有一个可选callback参数,该装置既可以潜在地被插入tornado.gen.Task。所以在大多数情况下,在子进程中异步运行代码就像这样简单:

import multiprocessing
import contextlib

from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial

def worker():
    print "async work here"

@gen.coroutine
def async_run(func, *args, **kwargs):
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    func = partial(async_run, worker)
    IOLoop().run_sync(func)

As I mentioned, this works well in mostcases. But if worker()throws an exception, callbackis never called, which means the gen.Tasknever finishes, and you hang forever. Now, if you know that your work will neverthrow an exception (because you wrapped the whole thing in a try/except, for example), you can happily use this approach. However, if you want to let exceptions escape from your worker, the only solution I found was to subclass some multiprocessing components, and make them call callbackeven if the worker sub-process raised an exception:

正如我所提到的,这在大多数情况下都很有效。但是如果worker()抛出异常,callback则永远不会被调用,这意味着gen.Task永远不会结束,并且您将永远挂起。现在,如果您知道您的工作永远不会抛出异常(例如,因为您将整个内容都包裹在try/ 中except),那么您可以愉快地使用这种方法。但是,如果您想让异常从您的工作callback进程中逃脱,我发现的唯一解决方案是将一些多处理组件子类化,即使工作进程子进程引发异常也让它们调用:

from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result
 ...

 if __name__ == "__main__":
     pool = TornadoPool(multiprocessing.cpu_count())
     ...

With these changes, the exception object will be returned by the gen.Task, rather than the gen.Taskhanging indefinitely. I also updated my async_runmethod to re-raise the exception when its returned, and made some other changes to provide better tracebacks for exceptions thrown in the worker sub-processes. Here's the full code:

通过这些更改,异常对象将由 返回gen.Task,而不是gen.Task无限期挂起。我还更新了我的async_run方法以在异常返回时重新引发异常,并进行了一些其他更改,以便为工作子进程中抛出的异常提供更好的回溯。这是完整的代码:

import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps

import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen

class WrapException(Exception):
    def __init__(self):
        exc_type, exc_value, exc_tb = sys.exc_info()
        self.exception = exc_value
        self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))

    def __str__(self):
        return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)

class TornadoApplyResult(ApplyResult):
    def _set(self, i, obj):
        self._success, self._value = obj 
        if self._callback:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]   

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        assert self._state == RUN
        result = TornadoApplyResult(self._cache, callback)
        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
        return result

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception 
    (or an Exception sub-class), this function will raise the 
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    if isinstance(result, Exception):
        raise result
    raise Return(result)

def handle_exceptions(func):
    """ Raise a WrapException so we get a more meaningful traceback"""
    @wraps(func)
    def inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception:
            raise WrapException()
    return inner

# Test worker functions
@handle_exceptions
def test2(x):
    raise Exception("eeee")

@handle_exceptions
def test(x):
    print x
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield async_run(test, "inside get")
            self.write("%s\n" % result)
            result = yield async_run(test2, "hi2")
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool(4)
    app.listen(8888)
    IOLoop.instance().start()

Here's how it behaves for the client:

以下是它对客户端的行为:

dan@dan:~$ curl localhost:8888/test
done
Caught an exception: 

Original traceback:
Traceback (most recent call last):
  File "./mutli.py", line 123, in inner
    return func(*args, **kwargs)
  File "./mutli.py", line 131, in test2
    raise Exception("eeee")
Exception: eeee

And if I send two simultaneous curl requests, we can see they're handled asynchronously on the server-side:

如果我同时发送两个 curl 请求,我们可以看到它们在服务器端异步处理:

dan@dan:~$ ./mutli.py 
inside get
inside get
caught exception inside get
caught exception inside get

Edit:

编辑:

Note that this code becomes simpler with Python 3, because it introduces an error_callbackkeyword argument to all asynchronous multiprocessing.Poolmethods. This makes it much easier to integrate with Tornado:

请注意,此代码在 Python 3 中变得更简单,因为它error_callback为所有异步multiprocessing.Pool方法引入了关键字参数。这使得与 Tornado 集成变得更加容易:

class TornadoPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        ''' Asynchronous equivalent of `apply()` builtin

        This version will call `callback` even if an exception is
        raised by `func`.

        '''
        super().apply_async(func, args, kwds, callback=callback,
                            error_callback=callback)

@gen.coroutine
def async_run(func, *args, **kwargs):
    """ Runs the given function in a subprocess.

    This wraps the given function in a gen.Task and runs it
    in a multiprocessing.Pool. It is meant to be used as a
    Tornado co-routine. Note that if func returns an Exception
    (or an Exception sub-class), this function will raise the
    Exception, rather than return it.

    """
    result = yield gen.Task(pool.apply_async, func, args, kwargs)
    raise Return(result)

All we need to do in our overridden apply_asyncis call the parent with the error_callbackkeyword argument, in addition to the callbackkwarg. No need to override ApplyResult.

除了kwarg之外,我们在覆盖中需要做的apply_async就是使用error_callback关键字参数调用父callback级。无需覆盖ApplyResult.

We can get even fancier by using a MetaClass in our TornadoPool, to allow its *_asyncmethods to be called directly as if they were coroutines:

我们可以通过在我们的 中使用 MetaClass 来变得更漂亮TornadoPool,允许*_async直接调用它的方法,就好像它们是协程一样:

import time
from functools import wraps
from multiprocessing.pool import Pool

import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future

def _argument_adapter(callback):
    def wrapper(*args, **kwargs):
        if kwargs or len(args) > 1:
            callback(Arguments(args, kwargs))
        elif args:
            callback(args[0])
        else:
            callback(None)
    return wrapper

def PoolTask(func, *args, **kwargs):
    """ Task function for use with multiprocessing.Pool methods.

    This is very similar to tornado.gen.Task, except it sets the
    error_callback kwarg in addition to the callback kwarg. This
    way exceptions raised in pool worker methods get raised in the
    parent when the Task is yielded from.

    """
    future = Future()
    def handle_exception(typ, value, tb):
        if future.done():
            return False
        future.set_exc_info((typ, value, tb))
        return True
    def set_result(result):
        if future.done():
            return
        if isinstance(result, Exception):
            future.set_exception(result)
        else:
            future.set_result(result)
    with stack_context.ExceptionStackContext(handle_exception):
        cb = _argument_adapter(set_result)
        func(*args, callback=cb, error_callback=cb)
    return future

def coro_runner(func):
    """ Wraps the given func in a PoolTask and returns it. """
    @wraps(func)
    def wrapper(*args, **kwargs):
        return PoolTask(func, *args, **kwargs)
    return wrapper

class MetaPool(type):
    """ Wrap all *_async methods in Pool with coro_runner. """
    def __new__(cls, clsname, bases, dct):
        pdct = bases[0].__dict__
        for attr in pdct:
            if attr.endswith("async") and not attr.startswith('_'):
                setattr(bases[0], attr, coro_runner(pdct[attr]))
        return super().__new__(cls, clsname, bases, dct)

class TornadoPool(Pool, metaclass=MetaPool):
    pass

# Test worker functions
def test2(x):
    print("hi2")
    raise Exception("eeee")

def test(x):
    print(x)
    time.sleep(2)
    return "done"

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        try:
            result = yield pool.apply_async(test, ("inside get",))
            self.write("%s\n" % result)
            result = yield pool.apply_async(test2, ("hi2",))
            self.write("%s\n" % result)
        except Exception as e:
            print("caught exception in get")
            self.write("Caught an exception: %s" % e)
            raise
        finally:
            self.finish()

app = tornado.web.Application([
    (r"/test", TestHandler),
])

if __name__ == "__main__":
    pool = TornadoPool()
    app.listen(8888)
    IOLoop.instance().start()

回答by dano

If you're willing to use concurrent.futures.ProcessPoolExecutorinstead of multiprocessing, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future, so they'll play nicely together out of the box. concurrent.futuresis included in Python 3.2+, and has been backported to Python 2.x.

如果您愿意使用concurrent.futures.ProcessPoolExecutor而不是multiprocessing,这实际上非常简单。Tornado 的 ioloop 已经支持 concurrent.futures.Future,因此它们开箱即可很好地协同工作。concurrent.futures包含在 Python 3.2+ 中,并已向后移植到 Python 2.x

Here's an example:

下面是一个例子:

import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen

def f(a, b, c, blah=None):
    print "got %s %s %s and %s" % (a, b, c, blah)
    time.sleep(5)
    return "hey there"

@gen.coroutine
def test_it():
    pool = ProcessPoolExecutor(max_workers=1)
    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
    print("running it asynchronously")
    ret = yield fut
    print("it returned %s" % ret)
    pool.shutdown()

IOLoop.instance().run_sync(test_it)

Output:

输出:

running it asynchronously
got 1 2 3 and ok
it returned hey there

ProcessPoolExecutorhas a more limited API than multiprocessing.Pool, but if you don't need the more advanced features of multiprocessing.Pool, it's worth using because the integration is so much simpler.

ProcessPoolExecutor具有比 更有限的 API multiprocessing.Pool,但如果您不需要 更高级的功能multiprocessing.Pool,则值得使用,因为集成要简单得多。