我在 Google 上搜索,也查看了类似的问题,但就是找不到明确的答案。我的问题是,所有关于阻塞收集的示例似乎都没有考虑长时间运行或异步的情况。如果这个问题已经有人回答了,我很抱歉,但我没有找到任何可以解释的答案。
我会有一个队列,BlockingCollection
用于存放待处理的任务。这些任务的处理速度可能很快,也可能需要几分钟。但我需要一个能够阻塞队列BlockingCollection
并在整个进程生命周期内都保持活跃的机制。我一直在尝试改进我处理任务的方式,这导致我把这个Process
方法变成了异步的。后来我开始参与“StartNew Bad”的讨论,而我得到的反馈似乎并不完整。
这是我正在做的事情的一个例子。
private BlockingCollection<Job> _queue;
void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
_queue = new BlockingCollection<Job>();
// Simulate the addition of Jobs
Task.Run(() => { while (!ct.IsCancellationRequested) { _queue.Add(new Job()); Thread.Sleep(1000); }});
// Start the processing ta... thr... whatever
Task.Factory.StartNew(() => ProcessJobs(ct), TaskCreationOptions.LongRunning);
Console.Read();
cts.Cancel();
_queue.CompleteAdding();
}
private async Task ProcessJobs(CancellationToken ct)
{
foreach (var job in _queue.GetConsumingEnumerable(ct))
{
try
{
var result = await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Job Processed.");
}
}
public class Job
{
public async Task<bool> Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
return true;
}
}
我担心的是,我一直在阅读Task.Factory.StartNew
异步方法的奇怪行为,例如,也许你的代码的第一部分在它为其创建的任何内容上运行,但是一旦你点击 await,当你回来时,你只会在从中提取的任何内容上ThreadPool
。你应该使用Task.Run
。
所以我的问题是我应该如何执行ProcessJobs
?保留StartNew
???Task.Run
new Thread(ProcessJobs)
ProcessJobs
需要在应用的整个生命周期内持续执行。只有CompleteAdding()
在应用关闭时,它才会被取消并加入队列。此处理是应用的主要功能。虽然可能存在没有作业添加的时间段,但很可能存在 process 方法无需等待新项目添加的时间段。
我们目前仅限于 .net 4.7.2,如果有必要的话,我们可能会转向 4.8。
提前致谢。
更新:我不是在问异步阻塞集合;那个问题没有任何帮助。我问的Thread
是Task.Run
、 或Task.Factory.StartNew
。即使我使用“有没有类似异步 BlockingCollection<T> 的东西?”中的任何建议,我仍然需要一个可以在等待输入时被阻塞的Thread
或Task
。当我说这个处理是主要功能时,我并不是说它是唯一的功能,并且整个过程都可能被阻塞。
更新:好的,我想我现在明白了。我之前把新旧概念搞混了。我不明白为什么我问线程的时候,他们让我使用不同的队列机制。所以,这里修改了上面的代码,去掉了阻塞,直接创建Thread
或Task
。
private Channel<Job> _queue;
void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
_queue = Channel.CreateBounded<Job>(100);
// Simulate the addition of Jobs
Task.Run(async () => { while (!ct.IsCancellationRequested) { await _queue.Writer.WriteAsync(new Job(), ct); Thread.Sleep(1000); }});
var processingTask = ProcessJobs(ct);
Console.Read();
cts.Cancel();
_queue.Writer.TryComplete();
// Consider checking the state of processingTask, waiting for it to finish?
}
private async Task ProcessJobs(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var job = await _queue.Reader.ReadAsync(ct);
await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Job Processed.");
}
}
public class Job
{
public async Task<bool> Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
return true;
}
}
再次感谢您的帮助。