AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / user-24859246

JJJunior's questions

Martin Hope
JJJunior
Asked: 2025-04-09 08:12:11 +0800 CST

Tarefa ou thread para processamento de BlockingCollection

  • 5

Pesquisei no Google e dei uma olhada em perguntas semelhantes aqui, mas simplesmente não consigo encontrar uma resposta clara. Meu problema é que todos os exemplos de coleta de bloqueios parecem não considerar execução longa ou assíncrona. Desculpe se isso já foi respondido, mas não encontrei nada que me explique.

Terei uma BlockingCollectionlista de quais tarefas serão enfileiradas. Essas tarefas serão processadas e podem ser rápidas ou levar minutos. Mas preciso de algo que bloqueie isso BlockingCollectione sobreviva durante todo o ciclo de vida do processo. Tenho tentado melhorar a forma como lido com tarefas, o que me levou a tornar o Processmétodo assíncrono. Então, comecei a entrar em discussões sobre "StartNew Bad" e o que me pareceu informações incompletas.

Aqui está um exemplo do que estou fazendo.

private BlockingCollection<Job> _queue;

void Main()
{
    var cts = new CancellationTokenSource();
    var ct = cts.Token;
    _queue = new BlockingCollection<Job>();
    
    // Simulate the addition of Jobs
    Task.Run(() => { while (!ct.IsCancellationRequested) { _queue.Add(new Job()); Thread.Sleep(1000); }});
    
    // Start the processing ta... thr... whatever
    Task.Factory.StartNew(() => ProcessJobs(ct), TaskCreationOptions.LongRunning);
    
    Console.Read();
    cts.Cancel();
    _queue.CompleteAdding();
}


private async Task ProcessJobs(CancellationToken ct)
{
    foreach (var job in _queue.GetConsumingEnumerable(ct))
    {
        try 
        {
            var result = await job.Process();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        Console.WriteLine("Job Processed.");
    }
}

public class Job
{
    public async Task<bool> Process()
    {
        // await network communication
        await Task.Run(() => Thread.Sleep(100));
        return true;
    }
}

Minha preocupação é que tenho lido que Task.Factory.StartNewhá um comportamento estranho com métodos assíncronos, como talvez a primeira parte do seu código sendo executada em qualquer coisa que ele crie para ela, mas assim que você clica em um await, quando retorna, você estará apenas em qualquer coisa que for puxada do ThreadPool. Que você deveria estar usando Task.Run.

Então, minha pergunta é: como devo executar ProcessJobs? Manter o StartNew? Task.Run? new Thread(ProcessJobs)?

ProcessJobsprecisa continuar durante toda a vida útil do aplicativo. Ele só será cancelado e colocado na fila CompleteAdding()quando o aplicativo estiver sendo descontinuado. Esse processamento é a função principal do aplicativo. Embora possa haver períodos em que nenhuma tarefa seja adicionada, provavelmente haverá períodos em que o método de processamento não precisará esperar pela adição de novos itens.

No momento, estamos restritos ao .net 4.7.2. Podemos ir para o 4.8, se isso for importante.

Desde já, obrigado.


Atualização: Não estou perguntando sobre uma coleção de bloqueios assíncronos; a outra pergunta não ajudou em nada. Estou perguntando Thread, Task.Run, ou Task.Factory.StartNew. Mesmo se eu usasse qualquer uma das sugestões de Existe algo como BlockingCollection<T> assíncrono?, eu ainda precisaria de um Threadou Taskque pudesse ser bloqueado enquanto aguarda a entrada. Quando digo que esse processamento é a função primária, não quero dizer que era a única função e que todo o processo poderia ser bloqueado.

Atualização: Ok, acho que entendi agora. Eu estava misturando o antigo com o novo. Não entendi por que me indicaram um mecanismo de fila diferente quando perguntei sobre threads. Então, aqui está o código acima alterado para remover o bloqueio e a criação direta de Threador Task.

private Channel<Job> _queue;

void Main()
{
    var cts = new CancellationTokenSource();
    var ct = cts.Token;
    _queue = Channel.CreateBounded<Job>(100);
    
    // Simulate the addition of Jobs
    Task.Run(async () => { while (!ct.IsCancellationRequested) { await _queue.Writer.WriteAsync(new Job(), ct); Thread.Sleep(1000); }});
    
    var processingTask = ProcessJobs(ct);
    
    Console.Read();
    cts.Cancel();
    _queue.Writer.TryComplete();
    // Consider checking the state of processingTask, waiting for it to finish?
}

private async Task ProcessJobs(CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        try 
        {
            var job = await _queue.Reader.ReadAsync(ct);
            await job.Process();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        Console.WriteLine("Job Processed.");
    }
}

public class Job
{
    public async Task<bool> Process()
    {
        // await network communication
        await Task.Run(() => Thread.Sleep(100));
        return true;
    }
}

Obrigado novamente pela ajuda.

c#
  • 2 respostas
  • 71 Views

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve