用例是使用 Spring Integration 的 Kafka 延迟重新处理下游失败消息ConcurrentMessageListenerContainer
。假设最大重试次数应为 2,固定延迟为 5 分钟。
Spring Integration 框架中是否有现成的解决方案?
已经过去了DeadLetterPublishingRecoverer
,它只有助于转移到 DLT,除非我们用轮询器监听同一个 DLT,否则不会进行进一步处理。
用例是使用 Spring Integration 的 Kafka 延迟重新处理下游失败消息ConcurrentMessageListenerContainer
。假设最大重试次数应为 2,固定延迟为 5 分钟。
Spring Integration 框架中是否有现成的解决方案?
已经过去了DeadLetterPublishingRecoverer
,它只有助于转移到 DLT,除非我们用轮询器监听同一个 DLT,否则不会进行进一步处理。
如果您研究一下 Spring Integration 解决方案,那么
KafkaMessageDrivenChannelAdapter
使用上述内容ConcurrentMessageListenerContainer
可以配置RetryTemplate
:Spring Integration 的另一种方法是在处理下游消息的位置进行重试。为此,可以使用以下方式配置特定端点
RequestHandlerRetryAdvice
:https ://docs.spring.io/spring-integration/reference/handler-advice/classes.html#retry-advice更新
不,没有办法
@RetryableTopic
在 Spring Integration 中应用KafkaMessageDrivenChannelAdapter
。问题是这种非阻塞重试正是为声明式、基于注释的 POJO 设计的。其中通道适配器针对定期MessageListener
注入目标侦听器容器进行了优化。我建议改为研究
@KafkaListener
一下@RetryableTopic
,然后使用@MessagingGateway
它从该侦听器方法调用 Spring Integration 流:https://docs.spring.io/spring-integration/reference/gateway.html。您甚至可能只是期望Message<?>
在您的中有一个@KafkaListener
并将其发送到注入的MessageChannel
。