我有一个表queue
,其中包含需要处理的项目:
CREATE TABLE public.queue (
id serial NOT NULL
CONSTRAINT queue_pkey
PRIMARY KEY
);
另一个表格process
表示来自队列的已处理项目(例如,已生成报告)。实际上,有更多这样的表(需要对一个项目执行更多的过程)。queue
和之间存在一对一的关系process
——每个项目只能处理一次。
CREATE TABLE public.process (
id serial NOT NULL
CONSTRAINT process_pkey
PRIMARY KEY,
queue_item_id integer NOT NULL
CONSTRAINT process_queue_item_id_key
UNIQUE
CONSTRAINT process_queue_item_id_8953ec7b_fk_datastore
REFERENCES public.queue
DEFERRABLE INITIALLY DEFERRED
);
以下是一些测试数据:
BEGIN;
TRUNCATE TABLE queue, process
RESTART IDENTITY CASCADE;
INSERT INTO queue
SELECT GENERATE_SERIES(1, 10000);
COMMIT;
处理项目的工作人员实现如下。代码在 Django(Python 框架)中,但我确信我的错误不是由 Django 或其 ORM 引起的。
(为简单起见,没有终止条件。)
while True:
with transaction.atomic():
queue_items = Queue.objects \
.filter(process=None) \
.order_by() \
.select_for_update(skip_locked=True, of=('self',))[:8]
print('Generating report...')
time.sleep(0.1)
Process.objects.bulk_create(
(Process(queue_item=q)
for q in queue_items)
)
以下是传输到数据库的 SQL 查询的记录:
-- while True:
BEGIN;
SELECT queue."id"
FROM queue
LEFT OUTER JOIN "process"
ON (queue."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
LIMIT 8 FOR UPDATE OF queue SKIP LOCKED;
-- print('Generating report...')
-- time.sleep(0.5)
INSERT INTO "process" ("queue_item_id")
VALUES (1),
(2),
(3),
(4),
(5),
(6),
(7),
(8)
RETURNING "process"."id";
COMMIT;
如果我启动一名工作人员,则队列处理得非常好。如果我运行两个或更多工作人员,我开始收到此错误:
duplicate key value violates unique constraint "process_queue_item_id_key"
DETAIL: Key (queue_item_id)=(**) already exists.
当这些行被锁定时,另一个事务如何为其中process
的项目创建行?queue
我尝试了什么:
- 我试图用 EXISTS 重写 SELECT 查询:
SELECT "queue"."id"
FROM "queue"
WHERE NOT (EXISTS(SELECT U0."id", U0."queue_item_id" FROM "process" U0 WHERE U0."queue_item_id" = "queue"."id"))
LIMIT 8
FOR UPDATE OF "queue" SKIP LOCKED
没有成功,就会发生同样的错误。
- 如果我随机排列行,则错误发生得更晚(几乎在队列的末尾)。
SELECT "queue"."id"
FROM "queue"
LEFT OUTER JOIN "process"
ON ("queue"."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
ORDER BY RANDOM()
LIMIT 8
FOR UPDATE OF "queue" SKIP LOCKED
- 我在事务中间放了一个断点,在另一个事务中我检查了我认为行锁定是否正常工作:
SELECT id
FROM queue
WHERE id NOT IN (
SELECT id
FROM queue
FOR UPDATE SKIP LOCKED
);
- 我的 Postgres 版本:
PostgreSQL 13.1, compiled by Visual C++ build 1914, 64-bit
. - 每个工作人员都有自己与 Postgres 的默认隔离级别(已提交读)连接。