Estou usando o spring boot 2.7 + kafka 3.0.
Eu tenho um bean na minha configuração conforme abaixo para definir o KafkaListenerEndpointRegistry.
Neste bean, preciso cadastrar meu listener com o método registerListenerContainer.
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry(Map<String, RetryConfig> retryConfigMap) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(
KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
// register original one
MethodKafkaListenerEndpoint<Object, Object> methodEndpoint = (MethodKafkaListenerEndpoint<Object, Object>) endpoint;
String originalTopic = new ArrayList<String>(endpoint.getTopics()).get(0);
super.registerListenerContainer(methodEndpoint, factory);
...
}
};
}
O método registerListenerContainer tem uma propriedade startImmediately que por padrão é false, devo defini-la como true, do documento diz:
O sinalizador startImmediately determina se o contêiner deve ser iniciado imediatamente.
Mas isso realmente não está claro para mim, qual é a diferença da configuração diferente? e que impacto teria se definido como falso?
Se você defini-lo como
false
, precisará iniciar um contêiner para esse terminal manualmente mais tarde. UseKafkaListenerEndpointRegistry.getListenerContainer(String id)
a API para chamar seu arquivostart()
. Caso contrário, ele é iniciado automaticamente pelo ciclo de vida do ApplicationContext: ou após toda a atualização ou imediatamente se você registrar esse endpoint em tempo de execução.Não está claro, porém, por que alguém faria essa
KafkaListenerEndpointRegistry
substituição se, na maioria dos casos, isso@KafkaListener
é suficiente. A repetição do tópico pode ser tratada pelo arquivoRetryableTopic
.