多进程写的事务封装
我有一个数据库方案(我正在使用Oracle),其中有多个进程在一个表中插入数据,而一个进程从中选择一个表。该表基本上用作中间存储,多个进程(以下称为Writers)向其中写入日志事件,单个进程(以下称为Reader)从中读取事件以进行进一步处理。读取器必须读取插入表中的所有事件。
目前,这是通过为每个插入的记录分配一个升序的ID来完成的。读取器定期从表中选择一个条目块,其中
id大于先前读取的块的最大id。例如。就像是:
SELECT * FROM TRANSACTION_LOG WHERE id > ( SELECT last_id FROM READER_STATUS );
这种方法的问题在于,由于写入程序是同时进行的,因此即使行按顺序升序分配,也不总是按照行的分配顺序插入行。即,有时将id = 100的行写入到id = 110的记录之后,因为写入id = 110的行的过程在写入记录id = 100的过程之后开始,但首先提交。如果它已经读取了id = 110的行,则可能导致Reader丢失id = 100的行。
强制Writer对表进行排他锁将解决该问题,因为这将迫使它们顺序插入,并使Reader等待所有未完成的提交。但是,这可能不会很快。
我的想法是,足以让读者等待任何杰出的作家
在阅读之前提交。也就是说,只要读者可以阅读,直到所有作家都写完,作家就可以继续并发操作。
我的问题是这样的:
我如何指示我的阅读器进程等待编写器进程的任何未完成提交?也欢迎提出上述问题的任何替代方案。
解决方案
有趣的问题。听起来我们正在构建一个不错的解决方案。
希望我能帮上忙。
一些建议...
作家状态
我们可以创建一个表WRITER_STATUS,该表具有last_id字段:每个写入器在写入ID之前都会更新该表,但要使用要写入日志的ID,但前提是其ID大于last_id的当前值。
读者还检查此表,现在知道是否还有任何作者。
读者日志
这可能更有效。
读取器读取后,它会检查检索到的记录中是否有任何孔。
然后,它将所有缺少的ID记录到MISSING_IDS表中,并在下次读取时执行类似的操作
SELECT * FROM TRANSACTION_LOG WHERE id > (SELECT last_id FROM READER_STATUS) OR id IN ( SELECT id from MISSING_IDS )
我们可能想在读取器进程中在表上放置排他锁。这将一直等到所有编写器完成并释放其行锁,因此我们可以确保没有未完成的编写器事务。
我不会做任何锁定,因为这会干扰并发性和吞吐量。
如果我们跟踪逐行处理的日志行,则也不需要Reader_Status表。
我要做的是:在日志表中添加一个新列。例如,将其称为"已处理"。将其设置为布尔值,默认为false(或者小整数,默认为0或者任何其他值)。作家在插入时使用默认值。
读取器查询要处理的下一个记录块时,他查询处理为false且id值较低的行。
SELECT * FROM Transaction_Log WHERE processed = 0 ORDER BY id LIMIT 10;
当他处理它们时,阅读器使用UPDATE将处理过程从false更改为true。因此,下次阅读器查询记录块时,他确定他不会得到已经处理过的行。
UPDATE Transaction_Log SET processed = 1 WHERE id = ?; -- do this for each row processed
此UPDATE不应与Writer的INSERT操作冲突。
如果其他作者未按顺序提交任何行,则如果读者始终按id列的顺序(从最低值到最高值)对其进行处理,则读者将在下次查询时看到它们。
由于我们知道Reader处理的last_id
,因此我们可以通过以下方式请求下一个工作项:
select * from Transaction_log where id = ( select last_id + 1 /* or whatever increment your sequencer has */ from Reader_status)
我同意AJ的解决方案(链接)。另外,以下建议可能有助于减少孔的数量。
1)使用" Oracle Sequence"来创建ID,并使用" auto-increment",如下所示
INSERT INTO transaction_table VALUES(id__seq.nextval, <other columns>);
2)使用autoCommit(true)
,以便插入将立即提交。
这两个步骤将大大减少孔的数量。仍然有可能某些插入首先开始,但后来提交,并且在它们之间进行读取操作。