Tenho uma tabela chamada queue
com itens que precisam ser processados:
CREATE TABLE public.queue (
id serial NOT NULL
CONSTRAINT queue_pkey
PRIMARY KEY
);
Outra tabela process
representa os itens processados da fila (por exemplo, um relatório foi gerado). Na realidade, existem mais tabelas desse tipo (há mais processos que precisam ser executados em um item). Existe uma relação um-para-um entre queue
e process
– cada item pode ser processado apenas uma vez.
CREATE TABLE public.process (
id serial NOT NULL
CONSTRAINT process_pkey
PRIMARY KEY,
queue_item_id integer NOT NULL
CONSTRAINT process_queue_item_id_key
UNIQUE
CONSTRAINT process_queue_item_id_8953ec7b_fk_datastore
REFERENCES public.queue
DEFERRABLE INITIALLY DEFERRED
);
Aqui estão alguns dados de teste:
BEGIN;
TRUNCATE TABLE queue, process
RESTART IDENTITY CASCADE;
INSERT INTO queue
SELECT GENERATE_SERIES(1, 10000);
COMMIT;
O trabalhador que processa os itens é implementado da seguinte forma. O código está em Django (framework Python), mas estou convencido de que meu erro não é causado pelo Django ou seu ORM.
(Para simplificar, não há condição de término.)
while True:
with transaction.atomic():
queue_items = Queue.objects \
.filter(process=None) \
.order_by() \
.select_for_update(skip_locked=True, of=('self',))[:8]
print('Generating report...')
time.sleep(0.1)
Process.objects.bulk_create(
(Process(queue_item=q)
for q in queue_items)
)
Aqui está uma transcrição das consultas SQL que viajam para o banco de dados:
-- while True:
BEGIN;
SELECT queue."id"
FROM queue
LEFT OUTER JOIN "process"
ON (queue."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
LIMIT 8 FOR UPDATE OF queue SKIP LOCKED;
-- print('Generating report...')
-- time.sleep(0.5)
INSERT INTO "process" ("queue_item_id")
VALUES (1),
(2),
(3),
(4),
(5),
(6),
(7),
(8)
RETURNING "process"."id";
COMMIT;
Se eu iniciar um trabalhador, a fila será processada perfeitamente. Se eu executar dois ou mais trabalhadores, começo a receber este erro:
duplicate key value violates unique constraint "process_queue_item_id_key"
DETAIL: Key (queue_item_id)=(**) already exists.
Como outra transação poderia criar linhas process
para itens em uma queue
quando essas linhas estão bloqueadas?
O que eu tentei:
- Eu tentei reescrever a consulta SELECT com EXISTS:
SELECT "queue"."id"
FROM "queue"
WHERE NOT (EXISTS(SELECT U0."id", U0."queue_item_id" FROM "process" U0 WHERE U0."queue_item_id" = "queue"."id"))
LIMIT 8
FOR UPDATE OF "queue" SKIP LOCKED
sem sucesso, ocorre o mesmo erro.
- Se eu organizar as linhas aleatoriamente, o erro ocorre muito mais tarde (quase no final da fila).
SELECT "queue"."id"
FROM "queue"
LEFT OUTER JOIN "process"
ON ("queue"."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
ORDER BY RANDOM()
LIMIT 8
FOR UPDATE OF "queue" SKIP LOCKED
- Coloquei um breakpoint no meio da transação e em outra transação verifiquei que o bloqueio de linha na minha opinião funciona corretamente:
SELECT id
FROM queue
WHERE id NOT IN (
SELECT id
FROM queue
FOR UPDATE SKIP LOCKED
);
- Minha versão do Postgres:
PostgreSQL 13.1, compiled by Visual C++ build 1914, 64-bit
. - Cada trabalhador tem sua própria conexão com o Postgres com nível de isolamento padrão (leitura confirmada).