Tenho uma base de código legada que está deixando o ThreadPool faminto por causa de muitas chamadas de sincronização (em vez de usar async/await). Estou tentando criar um processo em segundo plano que prolongará o lease do kubernetes devido ao gerenciamento de bloqueios no caso de execução em várias réplicas. Este processo deve ser executado com a prioridade mais alta. Mas estou com dificuldades para fazer isso porque parece não ser tão fácil ignorar o ThreadPool. Tentei usar o TaskScheduler personalizado com threads próprios, mas sem sorte. Isso é possível?
Aqui está minha implementação usando CustomTaskScheduler. O programa trava em HttpClient SendAsync().
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
EDIT: É interessante que o código trava mesmo se eu usar a versão síncrona do HttpClient Send() no meu próprio thread. Então ele ainda está usando ThreadPool. Aqui está a versão síncrona
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}
Você pode controlar onde o código é executado após o
await
, mas não pode controlar onde a API internaHttpClient.SendAsync
é concluída. AFAIK, ele é implementado para ser concluído noThreadPool
, e você não pode fazer nada sobre isso (além de reimplementar oHttpClient
do zero). Então, enquanto oThreadPool
estiver saturado, oHttpClient.SendAsync
não será concluído, porque não háThreadPool
thread disponível para concluí-lo. Quando um estiver disponível, ele reprogramará imediatamente a continuação para o seuCustomTaskScheduler
, então esse thread terá um trabalho minúsculo a fazer e estará imediatamente disponível para fazer outroThreadPool
trabalho pendente. O que é alguma coisa, mas provavelmente não ajudará seu caso porque há outra coisa que satura seuThreadPool
, que logo roubará esse thread também.O
HttpClient.SendAsync
não é especial em relação a onde ele é concluído. A maioria das APIs assíncronas .NET integradas são concluídas noThreadPool
, incluindo, por exemplo, oTask.Delay
método . Então, sua melhor aposta é certificar-se de que oThreadPool
esteja saudável e que não haja nenhum código em seu aplicativo que o deixe agressivamente faminto. Uma possível causa para um saturadoThreadPool
pode ser umaParallel.ForEach
chamada de loop com unconfiguredMaxDegreeOfParallelism
, especialmente se osource
do loop for uma coleção de bloqueio .await
vai agendar a continuação para ser executada viaSynchronizationContext.Current
, não necessariamenteTaskScheduler.Current
. E muitas operações assíncronas vão usar explicitamente o contexto de sincronização, em vez do agendador de tarefas atual, ou pior no seu caso, usar explicitamente o agendador/contexto de sincronização padrão em vez do atual.Se você criar seu próprio contexto de sincronização e defini-lo como o contexto atual ao executar operações usando seu pool de threads personalizado (e para não ser maldoso, defina o que era anteriormente o contexto atual quando terminar), esses lugares não usarão o pool de threads. Também vale a pena notar que, em seu exemplo, você tem awaits fora do agendador de tarefas personalizado, que agendarão todas as suas continuações usando o pool de threads, não seu agendador personalizado.
Mas mesmo se você fizer tudo isso, você tem um problema mais fundamental que são as melhores práticas para bibliotecas usarem
ConfigureAwait(false)
na maioria dos contextos, então elas vão intencionalmente garantir que suas continuações rodem no thread do pool de threads padrão em vez de no seu contexto personalizado, já que elas não acham que precisam disso. Você simplesmente vai ter um mau momento bloqueando o thread pool padrão e tentando contornar isso.Em vez de bloquear o pool de threads com trabalho de longa duração que o impede de ser usado conforme o esperado, faça com que esse trabalho use um pool de threads personalizado (ou apenas use
LongRunning
onTask.Run
) para que todas as coisas que desejam usar o pool de threads possam.