我们有 Spring Boot 2.7 + Kafka,可以这样使用消息:
@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
log.info("in kafka consumer");
// process event
}
但是,我们不知道这些是否正在同步处理(因此,如果我们花太长时间处理一条消息,它将延迟处理下一条消息)。
通常,我们可以从日志输出中看到它是否正在使用新线程进行处理,但由于某种原因,缺少以下信息:
2025-01-06T17:40:00,143Z INFO pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..
2025-01-06T17:40:07,721Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.
在这里我们可以看到,我们的代码中记录的行显示了线程名称,但是 Kafka 监听器内部记录的行没有显示,所以我们看不到它是否正在创建新线程。
在我们的logback.xml中我们有:
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>
问题是:
- Kafka 消费者是 aysnc 吗?
- 如果没有的话,我们该如何做呢?
- 为什么我们的标准日志格式没有被使用?
- 我们如何让 Kafka 使用我们的日志格式化程序,或者至少输出线程?
我们像这样包含 Kafka:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
您的假设和观察是正确的。
@KafkaListener
实现不是异步的。那里的逻辑是在同一线程中处理一个分区以保留记录偏移的顺序。CompletableFuture<Void>
如果将其用作返回类型,则可以使其异步@KafkaListener
。在文档中查看更多信息:https ://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/async-returns.html
答案在于 send() 方法。
根据官方文档:
正如您所看到的签名 - 它接受回调方法并返回 Future 对象。
调用此方法时,方法本身并不真正关心 acks 配置。它不会阻止调用,而是通过返回 Future 对象并接受回调将决定权留给调用方法。它会在收到消息时将其推送到缓冲区,其余部分留给这两个,即 Future 和回调。
在缓冲区级别,确认开始得到尊重,但这是并行完成的并且不会阻塞发送调用者。
当 acks=0 时,生产者将假定消息在发送时就已经写入。(又称为“发射后不管”)
当 acks=1 时,仅当领导者收到记录时,生产者才会认为写入成功,如果没有确认 - 它将根据您的配置重试并相应地使用回调。
使用 acks=all -> 这只会改变确认部分 - 即生产者将认为写入成功已成功写入所有副本,因为它将根据 min.insync.replicas 接收确认
Rest is as acks=1
。使用收到的未来,您可以稍后检查并继续发送消息或调用 get() 方法 - 这将导致阻塞。
或者您可以使用回调在收到确认时执行操作。
所以
TLDR
;如果 asks = 1 或 all,Kafka 生产者是否需要等待来自 broker 的 ack 响应而不能做任何事情?
取决于您是否立即使用该
Future.get()
方法 - 该方法将会阻塞。或者直接忽略返回并将操作委托给回调