AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79248071
Accepted
lonix
lonix
Asked: 2024-12-03 22:45:46 +0800 CST2024-12-03 22:45:46 +0800 CST 2024-12-03 22:45:46 +0800 CST

ConcurrentQueue com muitos enfileiradores e um único desenfileirador

  • 772

Estou usando um ConcurrentQueue<T>. Eu preciso:

  • vários consumidores para enfileirar itens
  • um único consumidor para retirar todos os itens da fila de uma só vez
  • durante a dequeue, para bloquear a fila para que outros consumidores não possam enfileirar novos itens ao mesmo tempo
public class Cache
{

  private readonly ConcurrentQueue<Item> _queue = new();

  public void Enqueue(Item item)    // what synchronisation to perform here?
  {
    ArgumentNullException.ThrowIfNull(item, nameof(item));
    _queue.Enqueue(item);
  }

  public void Flush()               // what synchronisation to perform here?
  {
    // dequeue all items
    var items = new List<Item>();
    while (_queue.TryDequeue(out var item))
      items.Add(item);

    // `DoSomething` could take time; therefore above we
    // dequeued and cached all items, so other consumers
    // can immediately continue enqueuing
    foreach (var item in items)
      DoSomething(item);
  }

  public void DoSomething(Item item)
  {
    // ...
  }

}

Estou confuso sobre qual é a melhor ferramenta de simultaneidade para sincronizar o acesso _queue(em Enqueuee Flush).

Devo usar um simples lockou algo mais exótico?

ATUALIZAÇÃO: para responder aos comentários abaixo, não estou vinculado ao ConcurrentQueue; se houver uma coleção simultânea mais apropriada, por favor me avise.

c#
  • 2 2 respostas
  • 71 Views

2 respostas

  • Voted
  1. Best Answer
    JonasH
    2024-12-04T00:06:19+08:002024-12-04T00:06:19+08:00

    A opção mais simples seria usar apenas um bloqueio e uma lista:

    public class Cache
    {
        private List<Item> _queue = new();
        private readonly object _lockObj = new();
        public void Enqueue(Item item)
        {
            lock (_lockObj)
            {
                _queue.Add(item);
            }
        }
    
        public void Flush()
        {
            List<Item> items;
            lock (_lockObj)
            {
                items = _queue;
                _queue = new List<Item>();
            }
            foreach (var item in items)
                DoSomething(item);
        }
    
        public void DoSomething(Item item)
        {
            ...
        }
    }
    

    Isso deve funcionar razoavelmente bem, dado que o bloqueio é mantido apenas momentaneamente. Você pode querer definir a capacidade da lista para evitar realocações um tanto caras.

    Mas a questão revela uma possível visão problemática. O único ponto em que a fila tem garantia de estar vazia é logo depois de _queue = new List<Item>(), antes do bloqueio ser liberado. Mas você não está fazendo nada neste ponto, e assim que o bloqueio for liberado, pode haver itens na fila novamente.

    Em outras palavras, um thread que enfileira um item não tem como saber se ele foi incluído em um flush ou não, a menos que haja alguma outra forma de sincronização. Nem o thread de flushing pode saber se há enfileiramentos pendentes. Então, não deve haver realmente nenhuma diferença em comparação com sua versão original.

    Então não está realmente claro qual é o objetivo real. Declarações como "um único ponto no tempo" são problemáticas, pois não há ordenação global. A única ordem em que você pode confiar é aquela fornecida pelas várias primitivas de sincronização. É DoSomethingnecessário ser sincronizado com relação a Enqueue? Se for, ele precisa ser executado dentro do bloqueio. Talvez você precise de alguma sincronização externa para garantir que todo o enfileiramento tenha sido feito antes da descarga?

    Em geral, eu sugeriria dar uma olhada em BlockingCollection , acho que é mais útil do que ConcurrentQueues brutos. DataFlow e Channels podem ser outras bibliotecas a serem consideradas.

    • 5
  2. Luaan
    2024-12-03T23:54:52+08:002024-12-03T23:54:52+08:00

    Uma abordagem simples seria apenas ler o número de itens na fila no início do Flushmétodo. Então, em vez de ler todos os itens na fila, você lê apenas quantos havia no início. Como você está fazendo toda a leitura em um thread, não precisa se preocupar com colisões. E os produtores podem continuar adicionando mais itens alegremente.

    Dito isso, é difícil dizer com as informações que você forneceu, mas duvido que até isso seja necessário. Por que não ler todos os itens? Você está preocupado que isso DoSomethingseja caro; mas apenas ler os itens em uma lista não é. Existe realmente uma diferença funcional entre "ler a contagem, depois ler a contagem de itens e depois processá-los" e "ler todos os itens e depois processá-los"? Apenas separe as partes "ler os itens" de "processar os itens" e você estará essencialmente no mesmo ponto.

    Se você quiser definir um limite mais rígido, você teria que usar um bloqueio de leitor-escritor. Os publicadores pegam um bloqueio de leitura ao enviar, o consumidor pega um bloqueio de gravação. O bloqueio de gravação espera que todos os bloqueios de leitura sejam liberados, então nenhum outro bloqueio de leitura pode ser tomado até que o bloqueio de gravação seja liberado. Assim que você construir sua lista de "itens para processar", você pode liberar o bloqueio de gravação e os publicadores podem continuar enviando.

    Mas com o que você mostrou é difícil ver por que isso seria de alguma forma melhor do que a abordagem ingênua :) Seja qual for a sincronização que você precisa fazer, por que há uma diferença entre "o início do método Flush" e "ler todos os itens em um buffer"? Se você depende desse tipo de diferença, seu código não é realmente thread-safe de qualquer maneira, já que você não tem restrições sobre como Flushé chamado; é perfeitamente legal para o tempo de execução causar, digamos, uma espera de 10s na entrada do método. Se isso quebra seu mecanismo, você precisa de um mecanismo melhor.

    • 2

relate perguntas

  • Polly DecorrelatedJitterBackoffV2 - como calcular o tempo máximo necessário para concluir todas as novas tentativas?

  • Wpf. Role o DataGrid dentro do ScrollViewer

  • A pontuação que ganhei na página do jogo com .NET MAUI MVVM não é visível em outras páginas. Como posso manter os dados de pontuação no dispositivo local

  • Use a hierarquia TreeView com HierarchicalDataTemplate de dentro de um DataTemplate

  • Como posso melhorar essa interface de validação no .NET?

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle?

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Quando devo usar um std::inplace_vector em vez de um std::vector?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Marko Smith

    Estou tentando fazer o jogo pacman usando apenas o módulo Turtle Random e Math

    • 1 respostas
  • Martin Hope
    Aleksandr Dubinsky Por que a correspondência de padrões com o switch no InetAddress falha com 'não cobre todos os valores de entrada possíveis'? 2024-12-23 06:56:21 +0800 CST
  • Martin Hope
    Phillip Borge Por que esse código Java simples e pequeno roda 30x mais rápido em todas as JVMs Graal, mas não em nenhuma JVM Oracle? 2024-12-12 20:46:46 +0800 CST
  • Martin Hope
    Oodini Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores? 2024-12-12 06:27:11 +0800 CST
  • Martin Hope
    sleeptightAnsiC `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso? 2024-11-09 07:18:53 +0800 CST
  • Martin Hope
    The Mad Gamer Quando devo usar um std::inplace_vector em vez de um std::vector? 2024-10-29 23:01:00 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST
  • Martin Hope
    MarkB Por que o GCC gera código que executa condicionalmente uma implementação SIMD? 2024-02-17 06:17:14 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve