我使用的是 spring boot 2.7 + kafka 3.0。
我的配置中有一个如下的 bean 来定义 KafkaListenerEndpointRegistry。
在此 bean 中,我需要使用方法 registerListenerContainer 注册我的侦听器。
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry(Map<String, RetryConfig> retryConfigMap) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(
KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
// register original one
MethodKafkaListenerEndpoint<Object, Object> methodEndpoint = (MethodKafkaListenerEndpoint<Object, Object>) endpoint;
String originalTopic = new ArrayList<String>(endpoint.getTopics()).get(0);
super.registerListenerContainer(methodEndpoint, factory);
...
}
};
}
registerListenerContainer 方法有一个属性 startImmediately,默认情况下为 false,我应该将其设置为 true,从文档中可以看出:
startImmediately 标志确定容器是否应立即启动。
但这对我来说真的不清楚,不同的设置有什么区别?如果设置为 false 会有什么影响?
如果将其设置为
false
,则必须稍后手动启动该端点的容器。使用KafkaListenerEndpointRegistry.getListenerContainer(String id)
API 来调用其start()
. 否则,它会由 ApplicationContext 生命周期自动启动:或者在整个刷新之后,或者如果您在运行时注册该端点,则立即启动。KafkaListenerEndpointRegistry
但尚不清楚,如果在大多数情况下这就@KafkaListener
足够了,为什么要这样做呢?主题重试可以由 来处理RetryableTopic
。