我们目前正在开发一个 Spring Integration 流程,其中每条传入消息都需要触发对外部 HTTP 服务的调用。外部调用最多可能需要 20 秒才能完成,并且响应可能非常大——需要将其完全保存在内存中以便进一步处理。
为了提高可扩展性并减少阻塞的平台线程数量,我们希望利用 Java 的虚拟线程(Project Loom,Java 21+)。我们目前的方法是:
Executors.newVirtualThreadPerTaskExecutor()
作为集成流程中通道的执行者:
IntegrationFlow.from(adapter).channel(c -> c.executor(virtualThreadExecutor)).handle(processingService)
当消息到达时,我们已经可以在 processingService 中验证它们是由虚拟线程处理的。
我们想知道的是:
这是在 Spring Integration 中使用虚拟线程的正确方法吗?或者是否有更好的方法将虚拟线程集成到这种流程中?
在这种情况下使用虚拟线程时我们应该注意哪些潜在的陷阱、限制或性能问题?
Spring Integration 中是否存在某些特定组件(例如网关、服务激活器等)可能还不支持虚拟线程?
为了避免内存耗尽,我们考虑过限制并发外部请求的数量。有什么最佳实践吗?
我们非常期待大家分享最佳实践或实际经验。提前致谢!
我们目前正在使用 SpringBoot 3.4.4 和 Java 21。
是的,就是这样。只要你能注入一个
Executor
,它就可以用来做虚拟线程。这是一个关于虚拟线程本质的更普遍的问题,与 Spring 无关。因此,如果在调用过程中某个地方固定了平台线程,从 Spring 的角度来看,我们无能为力。我认为这篇文章很好地解释了哪些问题以及可能发生在哪里:https://medium.com/@phil_3582/java-virtual-threads-some-early-gotchas-to-look-out-for-f65df1bad0db
Spring 产品组合中的所有代码都经过了优化,以避免
synchronized
出现阻塞。我们保持依赖项的更新,但这并不意味着我们依赖的某些库对虚拟线程友好。例如,Apache Kafka(即使是当前版本)也为每个消费者实例4.0.0
使用常规的for 任务。请参阅及其用法。Thread
ApplicationEventHandler
AsyncKafkaConsumer
这可能就是我们所说的
rate limiting
。更多信息请参阅文档:https ://docs.spring.io/spring-integration/reference/handler-advice/classes.html#rate-limiter-advice