作业队列作为具有多个消费者的 SQL 表 (PostgreSQL)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/6507475/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-01 11:08:33  来源:igfitidea点击:

Job queue as SQL table with multiple consumers (PostgreSQL)

sqlpostgresqlqueueproducer-consumer

提问by code_talker

I have a typical producer-consumer problem:

我有一个典型的生产者-消费者问题:

Multiple producer applications write job requests to a job-table on a PostgreSQL database.

多个生产者应用程序将作业请求写入 PostgreSQL 数据库上的作业表。

The job requests have a state field that starts contains QUEUED on creation.

作业请求有一个 state 字段,该字段在创建时包含 QUEUED。

There are multipleconsumer applications that are notified by a rule when a producer inserts a new record:

多个由当生产者插入一条新记录的规则通知的消费应用:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

They will try to reserve a new record by setting its state to RESERVED. Of course, only on consumer should succeed. All other consumers should not be able to reserve the same record. They should instead reserve other records with state=QUEUED.

他们将尝试通过将状态设置为 RESERVED 来保留新记录。当然,只有在消费者上才能成功。所有其他消费者不应该能够保留相同的记录。他们应该使用 state=QUEUED 保留其他记录。

Example: some producer added the following records to table jobrecord:

示例:某些生产者将以下记录添加到表jobrecord

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

now, two consumers A, Bwant to process them. They start running at the same time. One should reserve id 1, the other one should reserve id 2, then the first one who finishes should reserve id 3 and so on..

现在,两个消费者AB想要处理它们。它们同时开始运行。一个应该保留id 1,另一个应该保留id 2,然后第一个完成的应该保留id 3,依此类推。

In a pure multithreaded world, I would use a mutex to control access to the job queue, but the consumers are different processes that may run on different machines. They only access the same database, so all synchronization must happen through the database.

在纯多线程世界中,我会使用互斥锁来控制对作业队列的访问,但消费者是可能在不同机器上运行的不同进程。他们只访问同一个数据库,所以所有的同步都必须通过数据库发生。

I read a lot of documentation about concurrent access and locking in PostgreSQL, e.g. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.htmlSelect unlocked row in PostgresqlPostgreSQL and locking

我阅读了很多关于 PostgreSQL 中并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html在 Postgresql PostgreSQL 中选择未锁定的行并锁定

From these topics, I learned, that the following SQL statement should do what I need:

从这些主题中,我了解到以下 SQL 语句应该满足我的需求:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

Unfortunately, when I run this in multiple consumer processes, in about 50% of the time, they still reserve the same record, both processing it and one overwriting the changes of the other.

不幸的是,当我在多个消费者进程中运行它时,大约有 50% 的时间,它们仍然保留相同的记录,既处理它,又覆盖另一个的更改。

What am I missing? How do I have to write the SQL statement so that multiple consumers will not reserve the same record?

我错过了什么?如何编写 SQL 语句,以便多个消费者不会保留相同的记录?

采纳答案by jordani

Read my post here:

在这里阅读我的帖子:

Consistency in postgresql with locking and select for update

具有锁定和选择更新的 postgresql 中的一致性

If you use transaction and LOCK TABLE you will have no problems.

如果您使用事务和锁定表,您将没有问题。

回答by apinstein

I use postgres for a FIFO queue as well. I originally used ACCESS EXCLUSIVE, which yields correct results in high concurrency, but has the unfortunate effect of being mutually exclusive with pg_dump, which acquires a ACCESS SHARE lock during its execution. This causes my next() function to lock for a very long time (the duration of the pg_dump). This was not acceptable since we are a 24x7 shop and customers didn't like the dead time on the queue in the middle of the night.

我也将 postgres 用于 FIFO 队列。我最初使用 ACCESS EXCLUSIVE,它在高并发下产生正确的结果,但不幸的是与 pg_dump 互斥,后者在执行过程中获取了 ACCESS SHARE 锁。这会导致我的 next() 函数锁定很长时间(pg_dump 的持续时间)。这是不可接受的,因为我们是一家 24x7 的商店,客户不喜欢半夜排队等候的时间。

I figured there must be a less-restrictive lock which would still be concurrent-safe and not lock while pg_dump is running. My search led me to this SO post.

我认为必须有一个限制较少的锁,它仍然是并发安全的,并且在 pg_dump 运行时不会锁定。我的搜索使我找到了这篇 SO 帖子。

Then I did some research.

然后我做了一些研究。

The following modes are sufficient for a FIFO queue NEXT() function which will update the status of a job from queuedto runningwithout any concurrency fail, and also not block against pg_dump:

以下模式足以用于 FIFO 队列 NEXT() 函数,该函数会将作业的状态从排队状态更新为正在运行,而不会出现任何并发失败,并且不会阻止 pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Query:

询问:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Result looks like:

结果看起来像:

UPDATE 1
 job_id
--------
     98
(1 row)

Here is a shell script which tests all of the different lock mode at high concurrency (30).

这是一个 shell 脚本,它在高并发 (30) 下测试所有不同的锁定模式。

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Code is here as well if you want to edit: https://gist.github.com/1083936

如果您想编辑,代码也在这里:https: //gist.github.com/1083936

I am updating my application to use the EXCLUSIVE mode since it's the most restrictive mode that a) is correct and b) doesn't conflict with pg_dump. I chose the most restrictive since it seems the least risky in terms of changing the app from ACCESS EXCLUSIVE without being an uber-expert in postgres locking.

我正在更新我的应用程序以使用 EXCLUSIVE 模式,因为它是 a) 正确且 b) 与 pg_dump 不冲突的最严格的模式。我选择了限制性最强的选项,因为从 ACCESS EXCLUSIVE 更改应用程序的风险似乎最小,而无需成为 postgres 锁定方面的超级专家。

I feel pretty comfortable with my test rig and with the general ideas behind the answer. I hope that sharing this helps solve this problem for others.

我对我的测试装置和答案背后的一般想法感到非常满意。我希望分享这个有助于为其他人解决这个问题。

回答by mackross

No need to do a whole table lock for this :\.

无需为此做整个表锁定:\。

A row lock created with for updateworks just fine.

使用创建的行锁for update工作正常。

See https://gist.github.com/mackross/a49b72ad8d24f7cefc32for the change I made to apinstein's answer and verified that it still works.

请参阅https://gist.github.com/mackross/a49b72ad8d24f7cefc32了解我对 apinstein 的答案所做的更改并验证它仍然有效。

Final code is

最终代码是

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;

回答by Vladimir Filipchenko

what about just select?

只是选择怎么样?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

回答by Joe Van Dyk

You might want to look at how queue_classic does it. https://github.com/ryandotsmith/queue_classic

您可能想看看 queue_classic 是如何做到的。https://github.com/ryandotsmith/queue_classic

The code is pretty short and easy to understand.

代码很短而且很容易理解。

回答by code_talker

Okay, here is the solution that is working for me, based on the link from jordani. As some of my problems were in the way Qt-SQL works, I've included the Qt code:

好的,这是基于 jordani 的链接对我有用的解决方案。由于我的一些问题与 Qt-SQL 的工作方式有关,因此我包含了 Qt 代码:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

To check, if multiple consumers process the same job, I added a rule and a log-table:

为了检查是否有多个消费者处理同一个作业,我添加了一个规则和一个日志表:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Without the LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE;statement, the log-table fills occasionaly with entries, were one consumers has overwritten the values of another, but using the LOCK statement, the log-table remains empty :-)

如果没有该LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE;语句,日志表偶尔会填满条目,如果一个消费者覆盖了另一个消费者的值,但是使用 LOCK 语句,日志表仍然是空的 :-)

回答by Sean

Check out PgQinstead of reinventing the wheel.

查看PgQ而不是重新发明轮子。