有一个Spring Integration
由多个组成的应用程序integration flows
,并且该应用程序被部署为容器Kubernetes
。容器根据负载自动扩展。
在这些多个集成流中,有一个这样的流会重试来自 DB 的失败记录,并且在任何时间点都只能运行它的一个实例(类似于Quartz
中的集成Spring Batch
)。
这种行为如何实现Spring Integration
?
有一个Spring Integration
由多个组成的应用程序integration flows
,并且该应用程序被部署为容器Kubernetes
。容器根据负载自动扩展。
在这些多个集成流中,有一个这样的流会重试来自 DB 的失败记录,并且在任何时间点都只能运行它的一个实例(类似于Quartz
中的集成Spring Batch
)。
这种行为如何实现Spring Integration
?
用例是使用 Spring Integration 的 Kafka 延迟重新处理下游失败消息ConcurrentMessageListenerContainer
。假设最大重试次数应为 2,固定延迟为 5 分钟。
Spring Integration 框架中是否有现成的解决方案?
已经过去了DeadLetterPublishingRecoverer
,它只有助于转移到 DLT,除非我们用轮询器监听同一个 DLT,否则不会进行进一步处理。
此问题是 SO 问题的扩展,Spring Integration 将 ReactorContext 恢复为命令式处理
通过 WebFlux 调用后抛出的任何异常WebFluxRequestExecutingMessageHandler
都缺少Trace
信息。
描述该问题的示例可在https://github.com/syedyusufh/simple-handle-reactive.git上找到
这就是正在发生的事情。
WebFluxRequestExecutingMessageHandler
首先抛出的这个异常会通过上下文丢失的地方自动记录例外:
2m2024-06-24T20:27:02.445+04:00 INFO 1436 --- [ scheduling-1] [66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11] c.i.sample.config.TracingConfig : Request Headers: [Content-Type:"application/json", traceparent:"00-66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11-00"]
2024-06-24T20:27:03.997+04:00 INFO 1436 --- [ctor-http-nio-3] [66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11] c.i.sample.config.TracingConfig : Response Headers: [Access-Control-Allow-Origin:"*", Alt-Svc:"h3=":443"; ma=2592000", Content-Type:"application/json", Date:"Mon, 24 Jun 2024 16:27:05 GMT", Server:"Caddy", Vary:"Accept-Encoding", Transfer-Encoding:"chunked"]
2024-06-24T20:27:04.089+04:00 ERROR 1436 --- [oundedElastic-1] [ ] .o.WebFluxRequestExecutingMessageHandler : Failed to send async reply: org.springframework.integration.support.MessageBuilder@5e1691b0