AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / user-142475

illagrenan's questions

Martin Hope
illagrenan
Asked: 2021-02-09 07:20:18 +0800 CST

Erros de chave duplicada na tabela do tipo fila do Postgres quando as linhas devem ser bloqueadas por SELECT FOR UPDATE SKIP LOCKED

  • 2

Tenho uma tabela chamada queuecom itens que precisam ser processados:

CREATE TABLE public.queue (
    id serial NOT NULL
        CONSTRAINT queue_pkey
            PRIMARY KEY
);

Outra tabela processrepresenta os itens processados ​​da fila (por exemplo, um relatório foi gerado). Na realidade, existem mais tabelas desse tipo (há mais processos que precisam ser executados em um item). Existe uma relação um-para-um entre queuee process– cada item pode ser processado apenas uma vez.

CREATE TABLE public.process (
    id            serial  NOT NULL
        CONSTRAINT process_pkey
            PRIMARY KEY,
    queue_item_id integer NOT NULL
        CONSTRAINT process_queue_item_id_key
            UNIQUE
        CONSTRAINT process_queue_item_id_8953ec7b_fk_datastore
            REFERENCES public.queue
            DEFERRABLE INITIALLY DEFERRED
);

Aqui estão alguns dados de teste:

BEGIN;
TRUNCATE TABLE queue, process
    RESTART IDENTITY CASCADE;
INSERT INTO queue
SELECT GENERATE_SERIES(1, 10000);
COMMIT;

O trabalhador que processa os itens é implementado da seguinte forma. O código está em Django (framework Python), mas estou convencido de que meu erro não é causado pelo Django ou seu ORM.

(Para simplificar, não há condição de término.)

while True:
    with transaction.atomic():
        queue_items = Queue.objects \
                          .filter(process=None) \
                          .order_by() \
                          .select_for_update(skip_locked=True, of=('self',))[:8]

        print('Generating report...')
        time.sleep(0.1)

        Process.objects.bulk_create(
            (Process(queue_item=q)
             for q in queue_items)
        )

Aqui está uma transcrição das consultas SQL que viajam para o banco de dados:

-- while True:

BEGIN;

SELECT queue."id"
FROM queue
         LEFT OUTER JOIN "process"
                         ON (queue."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
LIMIT 8 FOR UPDATE OF queue SKIP LOCKED;

-- print('Generating report...')
-- time.sleep(0.5)

INSERT INTO "process" ("queue_item_id")
VALUES (1),
       (2),
       (3),
       (4),
       (5),
       (6),
       (7),
       (8)
RETURNING "process"."id";

COMMIT;

Se eu iniciar um trabalhador, a fila será processada perfeitamente. Se eu executar dois ou mais trabalhadores, começo a receber este erro:

duplicate key value violates unique constraint "process_queue_item_id_key"
DETAIL:  Key (queue_item_id)=(**) already exists.

Como outra transação poderia criar linhas processpara itens em uma queuequando essas linhas estão bloqueadas?

O que eu tentei:

  1. Eu tentei reescrever a consulta SELECT com EXISTS:
SELECT "queue"."id"
  FROM "queue"
 WHERE NOT (EXISTS(SELECT U0."id", U0."queue_item_id" FROM "process" U0 WHERE U0."queue_item_id" = "queue"."id"))
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED

sem sucesso, ocorre o mesmo erro.

  1. Se eu organizar as linhas aleatoriamente, o erro ocorre muito mais tarde (quase no final da fila).
SELECT "queue"."id"
  FROM "queue"
  LEFT OUTER JOIN "process"
    ON ("queue"."id" = "process"."queue_item_id")
 WHERE "process"."id" IS NULL
 ORDER BY RANDOM()
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED
  1. Coloquei um breakpoint no meio da transação e em outra transação verifiquei que o bloqueio de linha na minha opinião funciona corretamente:
SELECT id
FROM queue
WHERE id NOT IN (
    SELECT id
    FROM queue
        FOR UPDATE SKIP LOCKED
);
  • Minha versão do Postgres: PostgreSQL 13.1, compiled by Visual C++ build 1914, 64-bit.
  • Cada trabalhador tem sua própria conexão com o Postgres com nível de isolamento padrão (leitura confirmada).
postgresql python
  • 1 respostas
  • 218 Views

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    conectar ao servidor PostgreSQL: FATAL: nenhuma entrada pg_hba.conf para o host

    • 12 respostas
  • Marko Smith

    Como fazer a saída do sqlplus aparecer em uma linha?

    • 3 respostas
  • Marko Smith

    Selecione qual tem data máxima ou data mais recente

    • 3 respostas
  • Marko Smith

    Como faço para listar todos os esquemas no PostgreSQL?

    • 4 respostas
  • Marko Smith

    Listar todas as colunas de uma tabela especificada

    • 5 respostas
  • Marko Smith

    Como usar o sqlplus para se conectar a um banco de dados Oracle localizado em outro host sem modificar meu próprio tnsnames.ora

    • 4 respostas
  • Marko Smith

    Como você mysqldump tabela (s) específica (s)?

    • 4 respostas
  • Marko Smith

    Listar os privilégios do banco de dados usando o psql

    • 10 respostas
  • Marko Smith

    Como inserir valores em uma tabela de uma consulta de seleção no PostgreSQL?

    • 4 respostas
  • Marko Smith

    Como faço para listar todos os bancos de dados e tabelas usando o psql?

    • 7 respostas
  • Martin Hope
    Jin conectar ao servidor PostgreSQL: FATAL: nenhuma entrada pg_hba.conf para o host 2014-12-02 02:54:58 +0800 CST
  • Martin Hope
    Stéphane Como faço para listar todos os esquemas no PostgreSQL? 2013-04-16 11:19:16 +0800 CST
  • Martin Hope
    Mike Walsh Por que o log de transações continua crescendo ou fica sem espaço? 2012-12-05 18:11:22 +0800 CST
  • Martin Hope
    Stephane Rolland Listar todas as colunas de uma tabela especificada 2012-08-14 04:44:44 +0800 CST
  • Martin Hope
    haxney O MySQL pode realizar consultas razoavelmente em bilhões de linhas? 2012-07-03 11:36:13 +0800 CST
  • Martin Hope
    qazwsx Como posso monitorar o andamento de uma importação de um arquivo .sql grande? 2012-05-03 08:54:41 +0800 CST
  • Martin Hope
    markdorison Como você mysqldump tabela (s) específica (s)? 2011-12-17 12:39:37 +0800 CST
  • Martin Hope
    Jonas Como posso cronometrar consultas SQL usando psql? 2011-06-04 02:22:54 +0800 CST
  • Martin Hope
    Jonas Como inserir valores em uma tabela de uma consulta de seleção no PostgreSQL? 2011-05-28 00:33:05 +0800 CST
  • Martin Hope
    Jonas Como faço para listar todos os bancos de dados e tabelas usando o psql? 2011-02-18 00:45:49 +0800 CST

Hot tag

sql-server mysql postgresql sql-server-2014 sql-server-2016 oracle sql-server-2008 database-design query-performance sql-server-2017

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve