我想使用它KinesisMessageDrivenChannelAdapter
从 Kinesis 流中读取记录。首次启动消费者应用程序时,我希望它接收流中已存在的所有记录。但在后续启动时,应用程序应继续从来自 DynamoDb 的最新检查点序列号读取。
我的假设是否正确,adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
导致了这种行为?
我想使用它KinesisMessageDrivenChannelAdapter
从 Kinesis 流中读取记录。首次启动消费者应用程序时,我希望它接收流中已存在的所有记录。但在后续启动时,应用程序应继续从来自 DynamoDb 的最新检查点序列号读取。
我的假设是否正确,adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
导致了这种行为?
这仅适用于组中的新消费者。如果此消费者组和该分片已有检查点,则我们这样做:
因此,它将从存储的检查点开始消耗。
如果您想使用它
trimHorizon
,请拨打KinesisMessageDrivenChannelAdapter.resetCheckpoints()
。