Temos o Spring Boot 2.7 + Kafka e podemos consumir mensagens desta forma:
@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
log.info("in kafka consumer");
// process event
}
No entanto, não sabemos se elas estão sendo processadas de forma síncrona (portanto, se demorarmos muito para processar uma mensagem, isso atrasará o processamento da próxima).
Normalmente, podemos ver se ele está processando usando novos threads na saída do log, mas, por algum motivo, esta informação está faltando:
2025-01-06T17:40:00,143Z INFO pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..
2025-01-06T17:40:07,721Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.
Aqui podemos ver que as linhas registradas em nosso código mostram o nome do thread, mas aquelas registradas dentro do ouvinte do Kafka não, então não podemos ver se ele está criando novos threads ou não.
Em nosso logback.xml temos:
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>
A questão é:
- O Kafka é um serviço de atendimento ao consumidor?
- Se não, como fazemos isso?
- Por que nosso formato de log padrão não está sendo usado?
- Como fazemos o Kafka usar nosso formatador de log ou, pelo menos, gerar a saída do thread?
Incluímos Kafka assim:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Suas suposições e observações estão corretas. A
@KafkaListener
implementação não é assíncrona. A lógica ali é processar uma partição no mesmo thread para preservar uma ordem para deslocamentos de registro.Você pode tornar isso assíncrono se usar
CompletableFuture<Void>
como tipo de retorno do seu@KafkaListener
.Veja mais informações nos documentos: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/async-returns.html
A resposta está no método send().
De acordo com os documentos oficiais:
Como você pode ver na assinatura, ela aceita um método de retorno de chamada e também retorna um objeto Future.
Ao chamar esse método, o método em si não se importa realmente com a configuração acks. Ele não bloqueará a chamada, mas deixará a decisão para o método de chamada retornando um objeto Future e aceitando um retorno de chamada também. Ele enviará mensagens para o buffer conforme recebe, e deixará o resto para esses dois, ou seja, Future e retorno de chamada.
No nível do buffer, os acks começam a ser honrados, mas isso é feito em paralelo e não bloqueia o chamador de envio.
Com acks=0, o produtor assumirá que a mensagem foi escrita conforme é enviada. (também conhecido como Dispare e esqueça)
Com acks=1, o produtor considerará a gravação bem-sucedida somente quando o líder receber o registro. Caso não haja confirmação, ele tentará novamente com base na sua configuração e usará o retorno de chamada adequadamente.
Com acks=all -> Isso altera apenas a parte do reconhecimento, ou seja, o produtor considerará a gravação bem-sucedida em todas as réplicas, pois receberá o reconhecimento com base em min.insync.replicas
Rest is as acks=1
.Com o futuro que você recebeu, você pode verificá-lo mais tarde e continuar enviando mensagens ou chamar o método get() - o que causaria um bloqueio.
Ou você pode usar um retorno de chamada para executar a ação conforme a confirmação for recebida.
Então
TLDR
;Se perguntas = 1 ou todas, um produtor Kafka precisa esperar pela resposta de confirmação do broker e não pode fazer nada?
Depende se você usará o
Future.get()
método imediatamente - o que bloquearia.Ou simplesmente ignore o retorno e delegue ações para um retorno de chamada