Tenho uma base de código legada que está deixando o ThreadPool faminto por causa de muitas chamadas de sincronização (em vez de usar async/await). Estou tentando criar um processo em segundo plano que prolongará o lease do kubernetes devido ao gerenciamento de bloqueios no caso de execução em várias réplicas. Este processo deve ser executado com a prioridade mais alta. Mas estou com dificuldades para fazer isso porque parece não ser tão fácil ignorar o ThreadPool. Tentei usar o TaskScheduler personalizado com threads próprios, mas sem sorte. Isso é possível?
Aqui está minha implementação usando CustomTaskScheduler. O programa trava em HttpClient SendAsync().
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
EDIT: É interessante que o código trava mesmo se eu usar a versão síncrona do HttpClient Send() no meu próprio thread. Então ele ainda está usando ThreadPool. Aqui está a versão síncrona
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}