我有一个遗留的代码库,由于许多同步调用(而不是使用 async/await),ThreadPool 正在挨饿。我正在尝试创建后台进程,该进程将在多个副本中运行时管理锁,从而延长 kubernetes 租约。此进程应以最高优先级运行。但我很难做到这一点,因为绕过 ThreadPool 似乎并不容易。我尝试使用自定义 TaskScheduler 和自己的线程,但没有成功。这可能吗?
这是我使用 CustomTaskScheduler 的实现。该程序挂在 HttpClient SendAsync() 上。
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
编辑:有趣的是,即使我在自己的线程中使用同步版本的 HttpClient Send(),代码也会挂起。所以它仍然在使用 ThreadPool。这是同步版本
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}