Estou usando um ConcurrentQueue<T>
. Eu preciso:
- vários consumidores para enfileirar itens
- um único consumidor para retirar todos os itens da fila de uma só vez
- durante a dequeue, para bloquear a fila para que outros consumidores não possam enfileirar novos itens ao mesmo tempo
public class Cache
{
private readonly ConcurrentQueue<Item> _queue = new();
public void Enqueue(Item item) // what synchronisation to perform here?
{
ArgumentNullException.ThrowIfNull(item, nameof(item));
_queue.Enqueue(item);
}
public void Flush() // what synchronisation to perform here?
{
// dequeue all items
var items = new List<Item>();
while (_queue.TryDequeue(out var item))
items.Add(item);
// `DoSomething` could take time; therefore above we
// dequeued and cached all items, so other consumers
// can immediately continue enqueuing
foreach (var item in items)
DoSomething(item);
}
public void DoSomething(Item item)
{
// ...
}
}
Estou confuso sobre qual é a melhor ferramenta de simultaneidade para sincronizar o acesso _queue
(em Enqueue
e Flush
).
Devo usar um simples lock
ou algo mais exótico?
ATUALIZAÇÃO: para responder aos comentários abaixo, não estou vinculado ao ConcurrentQueue; se houver uma coleção simultânea mais apropriada, por favor me avise.
A opção mais simples seria usar apenas um bloqueio e uma lista:
Isso deve funcionar razoavelmente bem, dado que o bloqueio é mantido apenas momentaneamente. Você pode querer definir a capacidade da lista para evitar realocações um tanto caras.
Mas a questão revela uma possível visão problemática. O único ponto em que a fila tem garantia de estar vazia é logo depois de
_queue = new List<Item>()
, antes do bloqueio ser liberado. Mas você não está fazendo nada neste ponto, e assim que o bloqueio for liberado, pode haver itens na fila novamente.Em outras palavras, um thread que enfileira um item não tem como saber se ele foi incluído em um flush ou não, a menos que haja alguma outra forma de sincronização. Nem o thread de flushing pode saber se há enfileiramentos pendentes. Então, não deve haver realmente nenhuma diferença em comparação com sua versão original.
Então não está realmente claro qual é o objetivo real. Declarações como "um único ponto no tempo" são problemáticas, pois não há ordenação global. A única ordem em que você pode confiar é aquela fornecida pelas várias primitivas de sincronização. É
DoSomething
necessário ser sincronizado com relação aEnqueue
? Se for, ele precisa ser executado dentro do bloqueio. Talvez você precise de alguma sincronização externa para garantir que todo o enfileiramento tenha sido feito antes da descarga?Em geral, eu sugeriria dar uma olhada em BlockingCollection , acho que é mais útil do que ConcurrentQueues brutos. DataFlow e Channels podem ser outras bibliotecas a serem consideradas.
Uma abordagem simples seria apenas ler o número de itens na fila no início do
Flush
método. Então, em vez de ler todos os itens na fila, você lê apenas quantos havia no início. Como você está fazendo toda a leitura em um thread, não precisa se preocupar com colisões. E os produtores podem continuar adicionando mais itens alegremente.Dito isso, é difícil dizer com as informações que você forneceu, mas duvido que até isso seja necessário. Por que não ler todos os itens? Você está preocupado que isso
DoSomething
seja caro; mas apenas ler os itens em uma lista não é. Existe realmente uma diferença funcional entre "ler a contagem, depois ler a contagem de itens e depois processá-los" e "ler todos os itens e depois processá-los"? Apenas separe as partes "ler os itens" de "processar os itens" e você estará essencialmente no mesmo ponto.Se você quiser definir um limite mais rígido, você teria que usar um bloqueio de leitor-escritor. Os publicadores pegam um bloqueio de leitura ao enviar, o consumidor pega um bloqueio de gravação. O bloqueio de gravação espera que todos os bloqueios de leitura sejam liberados, então nenhum outro bloqueio de leitura pode ser tomado até que o bloqueio de gravação seja liberado. Assim que você construir sua lista de "itens para processar", você pode liberar o bloqueio de gravação e os publicadores podem continuar enviando.
Mas com o que você mostrou é difícil ver por que isso seria de alguma forma melhor do que a abordagem ingênua :) Seja qual for a sincronização que você precisa fazer, por que há uma diferença entre "o início do método Flush" e "ler todos os itens em um buffer"? Se você depende desse tipo de diferença, seu código não é realmente thread-safe de qualquer maneira, já que você não tem restrições sobre como
Flush
é chamado; é perfeitamente legal para o tempo de execução causar, digamos, uma espera de 10s na entrada do método. Se isso quebra seu mecanismo, você precisa de um mecanismo melhor.