O caso de uso é reprocessar as mensagens com falha downstream com um atraso usando o Kafka do Spring Integration ConcurrentMessageListenerContainer
. Digamos que o máximo de tentativas de repetição deve ser 2 com um atraso fixo de 5 minutos.
Existe uma solução pronta para uso disponível no framework Spring Integration?
Já revisei DeadLetterPublishingRecoverer
, isso ajuda apenas na mudança para um DLT, sem processamento adicional, a menos que ouçamos o mesmo DLT com um poller.
Se você analisar uma solução de integração Spring, então
KafkaMessageDrivenChannelAdapter
qual usa o mencionadoConcurrentMessageListenerContainer
poderia ser configurado comRetryTemplate
:Outra maneira com Spring Integration é tentar novamente exatamente no lugar onde você manipula a mensagem downstream. Para isso, um endpoint específico pode ser configurado com um
RequestHandlerRetryAdvice
: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html#retry-adviceATUALIZAR
Não, não há como aplicar o
@RetryableTopic
no Spring Integration'sKafkaMessageDrivenChannelAdapter
. O problema é que essa nova tentativa não bloqueante é projetada exatamente para POJOs declarativos baseados em anotações. Onde o adaptador de canal é otimizado paraMessageListener
injeção regular no contêiner do listener de destino.Eu sugiro olhar para um
@KafkaListener
com isso@RetryableTopic
em vez disso e usar@MessagingGateway
para chamar o fluxo de integração do Spring a partir desse método ouvinte: https://docs.spring.io/spring-integration/reference/gateway.html . Você pode até mesmo esperar umMessage<?>
em seu@KafkaListener
e enviá-lo para umMessageChannel
.