我是 Apache Flink 新手。我尝试使用 Flink 中的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,似乎运行良好。重新启动 flink 任务后,我再次收到相同的消息,尽管我已设置 GroupId。
下面是我从 Kafka 读取的代码片段:
KafkaSource<BestellungEvent> bestellungEventSource = KafkaSource.<BestellungEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("bestellungen")
.setGroupId("bla2")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new BestellungEventKeyValueDeserializationSchema()))
.build();
有人能帮我解决这个问题吗?
此致
托马斯
您正在使用
.setStartingOffsets(OffsetsInitializer.earliest())
,从最早的偏移量开始。您应该使用.setStartingOffsets(OffsetsInitializer.committedOffsets())
从组的最后一个偏移量开始消费。您可以在文档中阅读有关各种选项的更多信息(我不确定我是否链接到正确版本的文档,因为您没有提到您正在使用哪个版本的 Flink Kafka Connector。)如果这个答案没有帮助,请确保您的消费者在消费来自 Kafka 的消息后提交偏移量。