我们有 Spring Boot 2.7 + Kafka,可以这样使用消息:
@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
log.info("in kafka consumer");
// process event
}
但是,我们不知道这些是否正在同步处理(因此,如果我们花太长时间处理一条消息,它将延迟处理下一条消息)。
通常,我们可以从日志输出中看到它是否正在使用新线程进行处理,但由于某种原因,缺少以下信息:
2025-01-06T17:40:00,143Z INFO pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..
2025-01-06T17:40:07,721Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.
在这里我们可以看到,我们的代码中记录的行显示了线程名称,但是 Kafka 监听器内部记录的行没有显示,所以我们看不到它是否正在创建新线程。
在我们的logback.xml中我们有:
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>
问题是:
- Kafka 消费者是 aysnc 吗?
- 如果没有的话,我们该如何做呢?
- 为什么我们的标准日志格式没有被使用?
- 我们如何让 Kafka 使用我们的日志格式化程序,或者至少输出线程?
我们像这样包含 Kafka:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>