我将 Spring Kafka 添加到我的 Spring 应用程序中并期望它能够工作,但是我得到了
INFO [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.4.1
INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8a516edc2755df89
INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1694915576434
INFO --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node -1 disconnected.
WARN --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established.
我将其添加到我的pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.9</version>
</dependency>
并配置它
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic checkInTopic() {
return new NewTopic(checkInTopicName, 1, (short) 1);
}
其中bootstrapAddress
通过以下方式配置application.yml
:
spring:
kafka:
bootstrap-servers: localhost:9092
我还有一个生产者配置:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
怎么了?我应该以某种方式在本地主机端口上手动运行 Kafka 吗9092
?我假设它是由 Spring 框架自动完成的,就像我不必将它部署在 tomcat 上一样,因为它是捆绑的。它启动 Kafka 服务还是我应该自己启动?
是的,您必须在 localhost port 上手动运行 Kafka
9092
。Spring Kafka 模块未与 Kafka 二进制文件打包。它仅包含对 Kafka 主题、生产者和消费者进行操作所需的 Kafka API。
另一方面,您需要在系统上下载并设置 Apache Kafka。
有两种运行 Kafka 服务的选项:使用旧版 Zookeeper 和更新的 KRaft。
KRaft模式比ZooKeeper模式更具可扩展性,允许Kafka集群处理更多流量和数据。KRaft 模式比 ZooKeeper 模式更快,从而实现更低的延迟和更高的吞吐量。
初始步骤是相同的。
bin/
kafka 目录。克拉夫特
步骤继续:
./gradlew jar -PscalaVersion=2.13.10
KAFKA_CLUSTER_ID="$(./kafka-storage.sh random-uuid)"
./kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ../config/kraft/server.properties
log.dirs
指向其他有效目录而不是/tmp/kraft-combined-logs
../kafka-server-start.sh ../config/kraft/server.properties
对于 Windows,有一个文件夹
windows
,其中包含与 Linux/Mac shell 脚本相对应的批处理脚本。动物园管理员
步骤继续:
启动动物园管理员。
./zookeeper-server-start.sh ../config/zookeeper.properties
windows\zookeeper-server-start.bat ..\..\config\zookeeper.properties
编辑server.properties
listeners=PLAINTEXT://localhost:9092
以使代理能够在此套接字上侦听。log.dirs
指向其他有效目录而不是/tmp/kafka-logs
.启动 Kafka 代理。
./kafka-server-start.sh ../config/server.properties
windows\kafka-server-start.bat ..\..\config\server.properties
这就是初始设置的全部内容。
请访问此处,了解有关主题创建以及与 Kafka 相关的其他内容的更多信息。
现在,Spring 应用程序将能够连接到 Kafka。