Pesquisei no Google e dei uma olhada em perguntas semelhantes aqui, mas simplesmente não consigo encontrar uma resposta clara. Meu problema é que todos os exemplos de coleta de bloqueios parecem não considerar execução longa ou assíncrona. Desculpe se isso já foi respondido, mas não encontrei nada que me explique.
Terei uma BlockingCollection
lista de quais tarefas serão enfileiradas. Essas tarefas serão processadas e podem ser rápidas ou levar minutos. Mas preciso de algo que bloqueie isso BlockingCollection
e sobreviva durante todo o ciclo de vida do processo. Tenho tentado melhorar a forma como lido com tarefas, o que me levou a tornar o Process
método assíncrono. Então, comecei a entrar em discussões sobre "StartNew Bad" e o que me pareceu informações incompletas.
Aqui está um exemplo do que estou fazendo.
private BlockingCollection<Job> _queue;
void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
_queue = new BlockingCollection<Job>();
// Simulate the addition of Jobs
Task.Run(() => { while (!ct.IsCancellationRequested) { _queue.Add(new Job()); Thread.Sleep(1000); }});
// Start the processing ta... thr... whatever
Task.Factory.StartNew(() => ProcessJobs(ct), TaskCreationOptions.LongRunning);
Console.Read();
cts.Cancel();
_queue.CompleteAdding();
}
private async Task ProcessJobs(CancellationToken ct)
{
foreach (var job in _queue.GetConsumingEnumerable(ct))
{
try
{
var result = await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Job Processed.");
}
}
public class Job
{
public async Task<bool> Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
return true;
}
}
Minha preocupação é que tenho lido que Task.Factory.StartNew
há um comportamento estranho com métodos assíncronos, como talvez a primeira parte do seu código sendo executada em qualquer coisa que ele crie para ela, mas assim que você clica em um await, quando retorna, você estará apenas em qualquer coisa que for puxada do ThreadPool
. Que você deveria estar usando Task.Run
.
Então, minha pergunta é: como devo executar ProcessJobs
? Manter o StartNew
? Task.Run
? new Thread(ProcessJobs)
?
ProcessJobs
precisa continuar durante toda a vida útil do aplicativo. Ele só será cancelado e colocado na fila CompleteAdding()
quando o aplicativo estiver sendo descontinuado. Esse processamento é a função principal do aplicativo. Embora possa haver períodos em que nenhuma tarefa seja adicionada, provavelmente haverá períodos em que o método de processamento não precisará esperar pela adição de novos itens.
No momento, estamos restritos ao .net 4.7.2. Podemos ir para o 4.8, se isso for importante.
Desde já, obrigado.
Atualização: Não estou perguntando sobre uma coleção de bloqueios assíncronos; a outra pergunta não ajudou em nada. Estou perguntando Thread
, Task.Run
, ou Task.Factory.StartNew
. Mesmo se eu usasse qualquer uma das sugestões de Existe algo como BlockingCollection<T> assíncrono?, eu ainda precisaria de um Thread
ou Task
que pudesse ser bloqueado enquanto aguarda a entrada. Quando digo que esse processamento é a função primária, não quero dizer que era a única função e que todo o processo poderia ser bloqueado.
Atualização: Ok, acho que entendi agora. Eu estava misturando o antigo com o novo. Não entendi por que me indicaram um mecanismo de fila diferente quando perguntei sobre threads. Então, aqui está o código acima alterado para remover o bloqueio e a criação direta de Thread
or Task
.
private Channel<Job> _queue;
void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
_queue = Channel.CreateBounded<Job>(100);
// Simulate the addition of Jobs
Task.Run(async () => { while (!ct.IsCancellationRequested) { await _queue.Writer.WriteAsync(new Job(), ct); Thread.Sleep(1000); }});
var processingTask = ProcessJobs(ct);
Console.Read();
cts.Cancel();
_queue.Writer.TryComplete();
// Consider checking the state of processingTask, waiting for it to finish?
}
private async Task ProcessJobs(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var job = await _queue.Reader.ReadAsync(ct);
await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Job Processed.");
}
}
public class Job
{
public async Task<bool> Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
return true;
}
}
Obrigado novamente pela ajuda.