postgresql Celery Worker 数据库连接池

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/14526249/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-21 00:45:50  来源:igfitidea点击:

Celery Worker Database Connection Pooling

pythonpostgresqlconnection-poolingcelery

提问by oneself

I am using Celery standalone (not within Django). I am planning to have one worker task type running on multiple physical machines. The task does the following

我正在使用 Celery 独立(不在 Django 中)。我计划在多台物理机器上运行一种工作任务类型。该任务执行以下操作

  1. Accept an XML document.
  2. Transform it.
  3. Make multipledatabase reads and writes.
  1. 接受 XML 文档。
  2. 改造它。
  3. 进行多次数据库读写。

I'm using PostgreSQL, but this would apply equally to other store types that use connections. In the past, I've used a database connection pool to avoid creating a new database connection on every request or avoid keeping the connection open too long. However, since each Celery worker runs in a separate process, I'm not sure how they would actually be able to share the pool. Am I missing something? I know that Celery allows you to persist a result returned from a Celery worker, but that is not what I'm trying to do here. Each task can do several different updates or inserts depending on the data processed.

我正在使用 PostgreSQL,但这同样适用于使用连接的其他存储类型。过去,我使用数据库连接池来避免在每个请求上创建新的数据库连接或避免连接打开时间过长。但是,由于每个 Celery 工人都在一个单独的进程中运行,我不确定他们实际上如何能够共享池。我错过了什么吗?我知道 Celery 允许您保留从 Celery 工人返回的结果,但这不是我在这里想要做的。每个任务都可以根据处理的数据进行多次不同的更新或插入。

What is the right way to access a database from within a Celery worker?

从 Celery 工作者内部访问数据库的正确方法是什么?

Is it possible to share a pool across multiple workers/tasks or is there some other way to do this?

是否可以在多个工作人员/任务之间共享一个池,或者有其他方法可以做到这一点?

回答by ThatAintWorking

I like tigeronk2's idea of one connection per worker. As he says, Celery maintains its own pool of workers so there really isn't a need for a separate database connection pool. The Celery Signal docsexplain how to do custom initialization when a worker is created so I added the following code to my tasks.py and it seems to work exactly like you would expect. I was even able to close the connections when the workers are shutdown:

我喜欢 Tigeronk2 的每个工人一个连接的想法。正如他所说,Celery 维护着自己的工作池,因此确实不需要单独的数据库连接池。将芹菜信号文档解释如何做定制的初始化时创建一个工人,所以我下面的代码添加到我的tasks.py,它似乎工作完全像你期望的那样。当工作人员关闭时,我什至能够关闭连接:

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()

回答by tigeronk2

Have one DB connection per worker process. Since celery itself maintains a pool of worker processes, your db connections will always be equal to the number of celery workers. Flip side, sort of, it will tie up db connection pooling to celery worker process management. But that should be fine given that GIL allows only one thread at a time in a process.

每个工作进程有一个数据库连接。由于 celery 本身维护着一个工作进程池,因此您的数据库连接将始终等于 celery 工作进程的数量。另一方面,它会将数据库连接池绑定到 celery 工作进程管理。但这应该没问题,因为 GIL 在一个进程中一次只允许一个线程。

回答by Loren Abrams

You can override the default behavior to have threaded workers instead of a worker per process in your celery config:

您可以覆盖默认行为以在 celery 配置中使用线程工作者而不是每个进程的工作者:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

Then you can store the shared pool instance on your task instance and reference it from each threaded task invocation.

然后,您可以将共享池实例存储在您的任务实例上,并从每个线程任务调用中引用它。

回答by kev

Perhaps you can use pgbouncer. For celery nothing should change and the connection pooling is done outside of the processes. I have the same issue.

也许你可以使用pgbouncer。对于 celery,什么都不应该改变,连接池是在进程之外完成的。我有同样的问题

('perhaps' because I am not sure if there could be any side effects)

(“也许”是因为我不确定是否有任何副作用)

回答by p7k

Perhaps, celery.concurrency.geventcould provide the pool sharing and not aggravate the GIL. However, it's support is still "experimental".

也许,celery.concurrency.gevent可以提供池共享而不是加重 GIL。但是,它的支持仍然是“实验性的”。

And a psycopg2.pool.SimpleConnectionPoolto share amongst greenlets (coroutines) which will all run in a single process/thread.

还有一个psycopg2.pool.SimpleConnectionPool在 greenlets(协程)之间共享,它们都将在单个进程/线程中运行。

Tiny bit of other stackdiscussion on the topic.

关于该主题的其他堆栈讨论的一点点。

回答by Robin Loxley

Contribute back my findings by implementing and monitoring.

通过实施和监控来回馈我的发现。

Welcome feedback.

欢迎反馈。

Reference: use pooling http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

参考:使用池化http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

Each worker process (prefork mode specified by -c k) will establish one new connection to DB without pooling or reusing. So if using pooling, the pool is seen only at each worker process level. So pool size > 1 is not useful, but reusing connection is still fine for saving connection from open & close.

每个工作进程(由 -ck 指定的 prefork 模式)将与 DB 建立一个新连接,而无需池化或重用。因此,如果使用池化,则只能在每个工作进程级别看到池。所以池大小 > 1 没有用,但重用连接仍然可以从打开和关闭中保存连接。

If using one connection per worker process, 1 DB connection is established per worker process (prefork mode celery -A app worker -c k) at initialization phase. It saves connection from open & close repeatedly.

如果每个工作进程使用一个连接,则在初始化阶段每个工作进程建立 1 个 DB 连接(prefork 模式 celery -A app worker -ck)。它保存了反复打开和关闭的连接。

No matter how many worker thread (eventlet), each worker thread (celery -A app worker -P eventlet) only establish one connection to DB without pooling or reusing. So for eventlet, all worker threads (eventlets) on one celery process (celery -A app worker ...) have 1 db connection at each moment.

不管有多少工作线程(eventlet),每个工作线程(celery -A app worker -P eventlet)只建立一个到DB的连接,没有池化或复用。因此,对于 eventlet,一个 celery 进程(celery -A app worker ...)上的所有工作线程(eventlets)在每一时刻都有 1 个 db 连接。

According to celery docs

根据芹菜文档

but you need to ensure your tasks do not perform blocking calls, as this will halt all other operations in the worker until the blocking call returns.

但是您需要确保您的任务不会执行阻塞调用,因为这将停止工作器中的所有其他操作,直到阻塞调用返回。

It is probably due to the way of MYSQL DB connection is blocking calls.

这可能是由于 MYSQL DB 连接的方式阻塞了调用。