我正在使用ConcurrentQueue<T>
。我需要:
- 多个消费者将项目加入队列
- 单个消费者一次性出队所有项目
- 在出队期间,锁定队列,以便其他消费者无法同时将新项目入队
public class Cache
{
private readonly ConcurrentQueue<Item> _queue = new();
public void Enqueue(Item item) // what synchronisation to perform here?
{
ArgumentNullException.ThrowIfNull(item, nameof(item));
_queue.Enqueue(item);
}
public void Flush() // what synchronisation to perform here?
{
// dequeue all items
var items = new List<Item>();
while (_queue.TryDequeue(out var item))
items.Add(item);
// `DoSomething` could take time; therefore above we
// dequeued and cached all items, so other consumers
// can immediately continue enqueuing
foreach (var item in items)
DoSomething(item);
}
public void DoSomething(Item item)
{
// ...
}
}
_queue
我很困惑哪个是用于同步访问(在Enqueue
和中)的最佳并发工具Flush
。
我应该使用简单的lock
,还是使用一些更奇特的东西?
更新:为了解决下面的评论,我并不支持 ConcurrentQueue;如果有更合适的并发集合,请告诉我。
最简单的选择就是使用锁和列表:
考虑到锁只被暂时持有,这应该表现得相当好。您可能希望设置列表容量以避免成本较高的重新分配。
但这个问题揭示了一种可能存在问题的观点。唯一可以保证队列为空的时间点是在之后
_queue = new List<Item>()
,即锁被释放之前。但此时你没有做任何事情,一旦锁被释放,队列中可能又会有项目。换句话说,除非有其他形式的同步,否则将项目入队的线程无法知道该项目是否包含在刷新中。刷新线程也无法知道是否有待处理的入队。因此,与原始版本相比,实际上应该没有任何区别。
因此,目前还不清楚实际目标是什么。像“单个时间点”这样的陈述是有问题的,因为没有全局排序。您可以依赖的唯一顺序是各种同步原语提供的顺序。是否
DoSomething
需要与同步Enqueue
?如果是,则需要在锁内运行。也许您需要一些外部同步来确保在刷新之前已完成所有入队?总的来说,我建议看一下BlockingCollection,我发现它比原始 ConcurrentQueues 更有用。DataFlow 和 Channels 可能是其他值得考虑的库。
一种简单的方法是在
Flush
方法开始时只读取队列中的项目数。然后,您无需读取队列中的所有项目,而只需读取开始时有多少项目即可。由于您在一个线程上进行所有读取,因此您不必担心任何冲突。生产者可以愉快地继续添加更多项目。话虽如此,根据您提供的信息很难判断,但我甚至怀疑这是否必要。为什么不直接读取所有项目?您担心这
DoSomething
很昂贵;但将项目读入列表并不昂贵。“读取计数,然后读取计数项目,然后处理它们”和“读取所有项目,然后处理它们”之间真的有功能差异吗?只需将“读取项目”的部分与“处理项目”的部分分开,您基本上就处于同一点。如果您想设置更严格的限制,则必须使用读写锁。发布者在推送时获取读锁,消费者获取写锁。写锁等待所有读锁被释放,然后直到写锁被释放,才能获取更多读锁。一旦您建立了“要处理的项目”列表,您就可以释放写锁,发布者可以继续推送。
但是从您所展示的内容来看,很难看出为什么这种方法比简单的方法更好 :) 无论您需要进行哪种同步,为什么“启动 Flush 方法”和“将所有项目读入缓冲区”之间存在差异?如果您依赖这种差异,那么您的代码实际上并不是线程安全的,因为您对如何
Flush
调用没有任何限制;运行时导致进入该方法需要等待 10 秒是完全合法的。如果这会破坏您的机制,您需要一个更好的机制。