Temos o Spring Boot 2.7 + Kafka e podemos consumir mensagens desta forma:
@KafkaListener(topics = "${kafka.topic.stuff}")
public void consume(@Payload String message) {
log.info("in kafka consumer");
// process event
}
No entanto, não sabemos se elas estão sendo processadas de forma síncrona (portanto, se demorarmos muito para processar uma mensagem, isso atrasará o processamento da próxima).
Normalmente, podemos ver se ele está processando usando novos threads na saída do log, mas, por algum motivo, esta informação está faltando:
2025-01-06T17:40:00,143Z INFO pool-2-thread-1 c.c.g.scheduler.SomemScheduler [correlationToken:ANP-06c003a1-d31c-42f6-9d2d-a9cdb5bfde96] => some event scheduler started..
2025-01-06T17:40:07,721Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 c.c.g.s.cdm.consumer.MyConsumer [] => in kafka consumer
2025-01-06T17:40:14,665Z INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-SWH-1, groupId=SWH] Node -1 disconnected.
Aqui podemos ver que as linhas registradas em nosso código mostram o nome do thread, mas aquelas registradas dentro do ouvinte do Kafka não, então não podemos ver se ele está criando novos threads ou não.
Em nosso logback.xml temos:
<pattern>%date{"yyyy-MM-dd'T'HH:mm:ss,SSSXXX", UTC} %-5level %thread %logger{42} [%X{correlationTokenKV}] => %msg%n</pattern>
A questão é:
- O Kafka é um serviço de atendimento ao consumidor?
- Se não, como fazemos isso?
- Por que nosso formato de log padrão não está sendo usado?
- Como fazemos o Kafka usar nosso formatador de log ou, pelo menos, gerar a saída do thread?
Incluímos Kafka assim:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>