当 Kafka 作为数据源时,我对 Apache Flink 如何处理提交偏移有点困惑。
我已经配置了消费者组,可以正常消费消息并提交消息(因为我在 flink 上启用了检查点)。但是当我在 k8s 上重新启动部署时,即使消费者的 LAG 为零,所有消息也会再次被消费。似乎 flink 只是忽略了消费者提交偏移量。
我读了一些相关内容,似乎需要在某处存储状态,但我不明白如果我已经提交了偏移量,为什么还需要它。
当 Kafka 作为数据源时,我对 Apache Flink 如何处理提交偏移有点困惑。
我已经配置了消费者组,可以正常消费消息并提交消息(因为我在 flink 上启用了检查点)。但是当我在 k8s 上重新启动部署时,即使消费者的 LAG 为零,所有消息也会再次被消费。似乎 flink 只是忽略了消费者提交偏移量。
我读了一些相关内容,似乎需要在某处存储状态,但我不明白如果我已经提交了偏移量,为什么还需要它。
我想在 docker-compose 中启动时创建和配置 kafka 主题。我可以借助附加服务来完成此操作(请参阅 init-kafka)。
是否可以在不使用额外服务的情况下创建 Kafka 主题?我尝试command: kafka-topics.sh --create --bootstrap-server kafka:9092 --topic tasks_topic --partitions 3
在 Kafka 服务中使用,但没有成功
该代码对我有用:
version: "3.8"
services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CLIENT://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,CLIENT://localhost:9093
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
init-kafka:
image: bitnami/kafka:latest
command: kafka-topics.sh --create --bootstrap-server kafka:9092 --topic tasks_topic --partitions 3
depends_on:
- kafka
更新:我尝试了这个代码(添加了KAFKA_CREATE_TOPICS)但它没有帮助
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CLIENT://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,CLIENT://localhost:9093
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CREATE_TOPICS="my_tasks_topic1:5:1"
depends_on:
- zookeeper
我正在测试一个程序,以验证压缩是否有助于减少主题消息的大小。我的示例主题配置为“max.message.bytes=1024000”,大约为 1MB,在生产者端配置中,我将相同的值设置为“max.request.size”,然后我尝试发送一个大小为 1573015 的字符串,大约为 1.5MB,这引发了低于预期的错误。
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1573015 bytes when
serialized which is larger than 1048576, which is the value of the max.request.size configuration.
接下来,由于我希望在生产者级别负责压缩,我将生产者的压缩配置设置为“zstd”(我也尝试过 gzip),但生产者抛出了相同的错误。我期望压缩配置在发送消息之前将生产者的消息大小减小到 <1MB。
当我在主题级别或生产者级别测试“compression.type”或在主题和生产者上设置compression.type属性时,我也观察到了相同的行为(我想避免在代理级别设置此属性,因为我希望它只对特定主题或该主题的生产者生效)。
我想了解 compression.type 是否真的减少了从生产者发送到 Kafka 代理的消息大小,代理会解压并验证未压缩消息的大小并引发此错误? 或者是因为生产者可能存在配置错误,导致压缩首先没有发生?
如果有人可以阐明与 compression.type 有关的属性 max.request.size 的内部工作原理,我将不胜感激。
使用独立程序,我确实验证了我用于此测试的消息样本可以使用 gzip 和 zstd 压缩到 <1MB。我用于此测试的 kafka 版本是 Confluent Kafka Platform 8.0,它在 Ubuntu WSL 本地的单节点集群上运行。
我正在尝试使用 KRaft 创建一个包含 3 个代理和 3 个控制器的集群。但是每次一个代理发生故障,而之前发生故障的代理又恢复运行。
这是我的 docker 配置:
version: "3.7"
services:
controller-1:
image: confluentinc/cp-kafka:latest
hostname: controller-1
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER # Set your generated UUID her
volumes:
- controller_1_data:/var/lib/kafka/data
user: 1000:1000
networks:
- kafka-connector
restart: always
controller-2:
image: confluentinc/cp-kafka:latest
hostname: controller-2
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
volumes:
- controller_2_data:/var/lib/kafka/data
networks:
- kafka-connector
restart: always
controller-3:
image: confluentinc/cp-kafka:latest
hostname: controller-3
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
volumes:
- controller_3_data:/var/lib/kafka/data
networks:
- kafka-connector
restart: always
kafka-1:
image: confluentinc/cp-kafka:latest
hostname: kafka-1
container_name: kafka-1
ports:
- 29092:29092
- 9092:9092
environment:
KAFKA_BROKER_ID: 11
KAFKA_NODE_ID: 11
KAFKA_PROCESS_ROLES: broker
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.2:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller-2:9093,3@controller-3:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
volumes:
- kafka_1_data:/var/lib/kafka/data
networks:
- kafka-connector
restart: always
kafka-2:
image: confluentinc/cp-kafka:latest
hostname: kafka-2
container_name: kafka-2
ports:
- 29093:29092
- 9094:9092
environment:
KAFKA_BROKER_ID: 12
KAFKA_NODE_ID: 12
KAFKA_PROCESS_ROLES: broker
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.2:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,3@controller-3:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
volumes:
- kafka_2_data:/var/lib/kafka/data
networks:
- kafka-connector
restart: always
kafka-3:
image: confluentinc/cp-kafka:latest
hostname: kafka-3
container_name: kafka-3
ports:
- 29094:29092
- 9096:9092
environment:
KAFKA_BROKER_ID: 13
KAFKA_NODE_ID: 13
KAFKA_PROCESS_ROLES: broker
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.18.2:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: KixvqJ76Qn-xLPPrSfBQSw
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
volumes:
- kafka_3_data:/var/lib/kafka/data
networks:
- kafka-connector
restart: always
debezium:
image: debezium/connect:2.4
depends_on:
- kafka-1
- kafka-2
- kafka-3
environment:
- BOOTSTRAP_SERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
ports:
- 8083:8083
networks:
- kafka-connector
restart: always
logging:
driver: "json-file"
options:
max-size: "1g"
max-file: "5"
kafdrop:
image: obsidiandynamics/kafdrop
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- 9000:9000
environment:
KAFKA_BROKER_CONNECT: kafka-1:9092,kafka-2:9094,kafka-3:9096
networks:
- kafka-connector
restart: always
logging:
driver: "json-file"
options:
max-size: "1g"
max-file: "5"
volumes:
controller_1_data: ~
controller_2_data: ~
controller_3_data: ~
kafka_1_data: ~
kafka_2_data: ~
kafka_3_data: ~
networks:
kafka-connector:
driver: bridge
目前不介意 Debezium。我只是试图创建集群并生成一些消息。
我从 broker-3 收到的错误是这样的:
[2024-10-18 11:39:12,347] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:12,420] INFO [RaftManager id=13] Registered the listener org.apache.kafka.image.loader.MetadataLoader@1574972196 (org.apache.kafka.raft.KafkaRaftClient)
[2024-10-18 11:39:12,455] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:12,559] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:12,695] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:12,801] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:12,901] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:13,002] INFO [MetadataLoader id=13] initializeNewPublishers: the loader is still catching up because we still don't know the high water mark yet. (org.apache.kafka.image.loader.MetadataLoader)
[2024-10-18 11:39:13,063] ERROR Encountered fatal fault: Unexpected error in raft IO thread (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalStateException: Cannot transition to Follower with leaderId=3 and epoch=80 since it is not one of the voters [1, 2]
at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:381)
at org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:518)
at org.apache.kafka.raft.KafkaRaftClient.maybeTransition(KafkaRaftClient.java:1523)
at org.apache.kafka.raft.KafkaRaftClient.maybeHandleCommonResponse(KafkaRaftClient.java:1473)
at org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1071)
at org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1550)
at org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1676)
at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2251)
at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:64)
at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
如果 broker-2 关闭,那么唯一的区别就在于这部分:
[2024-10-18 11:43:54,404] ERROR Encountered fatal fault: Unexpected error in raft IO thread (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
java.lang.IllegalStateException: Cannot transition to Follower with leaderId=2 and epoch=123 since it is not one of the voters [1, 3]
我正在使用 Kafka,其主题有 4 个分区。Kafka 中消息的保留期 (TTL) 默认设置为 7 天。我正在运行一个非流式批处理作业来处理来自 Kafka 的数据,并在每次处理运行后手动存储 Kafka 偏移量。
以下是经过几天的处理后保存的偏移量的一个例子:
第 1 天(偏移量已保存):
{
"0": 100,
"1": 110,
"2": 90,
"3": 123
}
第 6 天(偏移量已保存):
{
"0": 20000,
"1": 21000,
"2": 11000,
"3": 17003
}
到第 7 天,Kafka 的保留策略将启动,所有超过 7 天的消息将被自动删除。
我的担忧:
当第 7 天之后新的数据被发送到 Kafka,并且旧消息被删除时,我想知道偏移量会发生什么。
我存储的最后处理的偏移量大约是 20000,我想确保第二天从偏移量 20001 开始读取将允许我正确读取新生成的消息,而不会遇到任何问题(例如 Kafka 重用旧偏移量)。
我计划写一篇关于根据条件搜索消息的博客。我觉得这个领域缺少工具/框架,而这对任何 Kafka 运营团队/开发团队来说都是一项常规活动。
这是我在 UI 中研究的第一个选项。大多数基于 UI 的 kafka 工具无法很好地搜索大型主题,至少我见过的是这样。
然后,如果我们可以使用基于 CLI 的工具,例如 kcat 或 kafka-*-consumer,它们可以在一定程度上扩展,但缺乏广泛的搜索功能。
这些促使我开始研究使用 kafka 连接器并添加过滤器 SMT 或可能使用 KSQL。或者用自己喜欢的语言编写完全原生的开发。
当然,我们可以将消息转储到存储桶或其他地方,然后在此基础上进行搜索。
我读到过 Conduktor 提供了一些使用 SQL 搜索的功能,但不确定它有多好?
向社区提问 - 您使用什么来搜索 Kafka 中的消息?我上面提到的任何一种工具...或者更好的工具。
任务是向 Kafka 主题发送具有具体价值的消息。
例如,我想以纯文本形式发送“someValue”。
为此我应该使用 PublishKafkaRecord_2_6 中的哪个 Kafka 属性?
例如,我将“FreeFormTextRecordSetWriter”视为一种服务,它将 RecordSet 的内容写入自由格式的文本。
但是在 PublishKafkaRecord_2_6 中我可以设置“someValue”以将其发布在目标主题中。
我清楚地了解将分区拆分为段的算法以及保留和清理策略。但仍然不清楚为什么 kafka 不在每个分区的一个文件中进行写入\读取?
我是 Apache Flink 新手。我尝试使用 Flink 中的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,似乎运行良好。重新启动 flink 任务后,我再次收到相同的消息,尽管我已设置 GroupId。
下面是我从 Kafka 读取的代码片段:
KafkaSource<BestellungEvent> bestellungEventSource = KafkaSource.<BestellungEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("bestellungen")
.setGroupId("bla2")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new BestellungEventKeyValueDeserializationSchema()))
.build();
有人能帮我解决这个问题吗?
此致
托马斯
出于测试目的,我删除了目标主题,并预计应用程序会在一段时间后超时。然而,经过一些研究,我了解到 Kafka Streams 默认会重试消息,直到目标主题再次上线,并且为 Kafka Producer 设置 StreamsConfig 下的配置(task.timeout.ms、request.timeout.ms 和 retries = 0)。
任何人都可以向我保证这个结论是正确的吗?如果不是,您可以就我需要的配置提出建议,以强制 Kafka Streams 一段时间后停止重试。