我有一个遗留的代码库,由于许多同步调用(而不是使用 async/await),ThreadPool 正在挨饿。我正在尝试创建后台进程,该进程将在多个副本中运行时管理锁,从而延长 kubernetes 租约。此进程应以最高优先级运行。但我很难做到这一点,因为绕过 ThreadPool 似乎并不容易。我尝试使用自定义 TaskScheduler 和自己的线程,但没有成功。这可能吗?
这是我使用 CustomTaskScheduler 的实现。该程序挂在 HttpClient SendAsync() 上。
class Program
{
static async Task Main()
{
// Simulate thread pool starvation
ThreadPool.SetMaxThreads(50, 1);
for (int i = 0; i < 60; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
using (var scheduler = new CustomTaskScheduler(workerCount: 1))
{
var factory = new TaskFactory(scheduler);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskNum = i;
await factory.StartNew(async () =>
{
Console.WriteLine($"Task {taskNum} is running on thread {Thread.CurrentThread.ManagedThreadId}");
await RunAsyncFunction(taskNum);
}, CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
Console.WriteLine("All tasks completed.");
await Task.Delay(1000000);
}
static async Task RunAsyncFunction(int taskNum)
{
Console.WriteLine($"Task {taskNum} started on thread {Thread.CurrentThread.ManagedThreadId}");
var client = new HttpClient();
await client.SendAsync(new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"));
Console.WriteLine($"Task {taskNum} resumed on thread {Thread.CurrentThread.ManagedThreadId}");
}
}
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
private readonly System.Collections.Concurrent.BlockingCollection<Task> taskQueue = new();
private readonly List<Thread> workerThreads = new();
private readonly CancellationTokenSource cts = new();
public CustomTaskScheduler(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var thread = new Thread(WorkerLoop)
{
IsBackground = true
};
workerThreads.Add(thread);
thread.Start();
}
}
protected override IEnumerable<Task> GetScheduledTasks() => taskQueue.ToArray();
protected override void QueueTask(Task task)
{
if (cts.IsCancellationRequested)
throw new InvalidOperationException("Scheduler is shutting down.");
taskQueue.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}
private void WorkerLoop()
{
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cts.Token))
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
TryExecuteTask(task);
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested) { }
}
public void Dispose()
{
cts.Cancel();
taskQueue.CompleteAdding();
foreach (var worker in workerThreads)
{
worker.Join();
}
taskQueue.Dispose();
cts.Dispose();
}
}
编辑:有趣的是,即使我在自己的线程中使用同步版本的 HttpClient Send(),代码也会挂起。所以它仍然在使用 ThreadPool。这是同步版本
ThreadPool.SetMaxThreads(12, 10000);
for (int i = 0; i < 20; i++)
{
_ = Task.Run(() =>
{
Thread.Sleep(100000);
});
}
var thread = new Thread(SendHttpRequest);
thread.IsBackground = true;
thread.Start();
Thread.Sleep(100000);
static void SendHttpRequest()
{
using (HttpClient client = new HttpClient())
using (HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, "https://kubernetes/healthz"))
{
var response = client.Send(request);
// Never goes here if ThreadPool is exhausted
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
}
}
您可以控制代码在 之后运行的位置
await
,但无法控制内置HttpClient.SendAsync
API 完成的位置。据我所知,它是在 上完成的ThreadPool
,您对此无能为力(除了HttpClient
从头开始重新实现 )。因此,只要ThreadPool
已饱和,HttpClient.SendAsync
就不会完成,因为没有ThreadPool
可用的线程来完成它。当有可用的线程时,它将立即继续重新安排到您的CustomTaskScheduler
,这样该线程将有极小的工作要做,并且可以立即执行其他待处理的ThreadPool
工作。这是一件好事,但很可能不会对您的情况有所帮助,因为还有其他东西使您的 饱和ThreadPool
,并且很快也会窃取这个线程。关于完成的位置,没有什么
HttpClient.SendAsync
特别之处。大多数内置的 .NET 异步 API 在 上完成ThreadPool
,例如包括Task.Delay
方法。因此,最好的办法是确保 是ThreadPool
健康的,并且应用程序中没有代码会严重耗尽它。导致 饱和的一个可能原因ThreadPool
可能是未配置 的Parallel.ForEach
循环调用,尤其是当循环的 是阻塞集合 时。MaxDegreeOfParallelism
source
await
将通过 来安排延续运行SynchronizationContext.Current
,不一定TaskScheduler.Current
。许多异步操作将明确使用同步上下文,而不是当前任务调度程序,或者更糟糕的是,明确使用默认调度程序/同步上下文而不是当前上下文。如果您在使用自定义线程池运行操作时创建自己的同步上下文并将其设置为当前上下文(并且不要刻薄,在完成后设置以前的当前上下文),那么这些地方将不会使用线程池。还值得注意的是,在您的示例中,您在自定义任务调度程序之外有等待,它们都将使用线程池而不是您的自定义调度程序来安排它们的延续。
但即使您做到了这一切,您仍会遇到一个更根本的问题,即
ConfigureAwait(false)
在大多数情况下,库的最佳做法都是使用它们,因此它们会故意确保它们的延续在默认线程池线程中运行,而不是在您的自定义上下文中运行,因为它们认为它们不需要它。您只会在阻塞默认线程池并尝试解决这个问题时遇到麻烦。不要用长时间运行的工作阻塞线程池,使其无法按预期使用,而是让该工作使用自定义线程池(或仅使用
LongRunning
)Task.Run
,以便所有想要使用线程池的事情都可以。