Tenho esta imagem do Kafka rodando em um contêiner:
service-kafka:
container_name: pepito-kafka-server
hostname: pepito
image: apache/kafka:3.7.1
networks:
- pepito-network
ports:
- 8484:9092
extra_hosts:
- "docker.internal:127.0.0.1"
healthcheck:
test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server pepito:9092 || exit 1
interval: 1s
timeout: 60s
retries: 60
como você pode ver, tenho uma porta personalizada para o kafka,
agora no meu aplicativo springboot estou tentando usar assim:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
e,
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
e,
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${pepito.group-id}")
private String groupId;
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "${spring.application.name}", groupId = "${pepito.group-id}")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group " + groupId + ": " + message);
}
}
Eu tenho um produtor e um consumidor, porque a ideia deste aplicativo é enviar, receber e processar mensagens,
agora, estou recebendo estes erros:
[36morg.apache.kafka.clients.Metadata [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Cluster ID: 5L6g3nShT-eMCtK--X85sw
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Discovered group coordinator localhost:9092 (id: 2147483546 rack: null)
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] (Re-)joining group
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 2147453646 disconnected.
[2m2025-04-21T15:43:36.068-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 2147483636 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null. isDisconnected: true. Rediscovery will be attempted.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Request joining group due to: rebalance failed due to 'null' (DisconnectException)
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.087-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.199-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.302-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.548-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.998-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:37.800-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
Ele está tentando se conectar a 127.0.0.1:9092, o host/porta padrão do springboot,
Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
e eu tenho em meu application.properties
spring:
kafka:
bootstrap-servers: "pepito:8484" # in my hosts pepito is pointing to where docker is running
Onde mais na minha configuração devo indicar a conexão SOMENTE ao pepito:8484, e nenhum outro IP/porta? Além disso, não alterei nada na minha imagem de contêiner do Kafka. Preciso definir algo lá? Qualquer ajuda é bem-vinda.