Eu tenho um aplicativo de console (o consumidor). Quero consumir as mesmas mensagens novamente quando executar novamente meu aplicativo. Eu configurei: EnableAutoCommit = false mas cada vez que executo novamente meu aplicativo leva quase 1 minuto para consumir mensagens, por que tanto tempo?
using Confluent.Kafka;
var config = new ConsumerConfig
{
BootstrapServers = "...",
SaslUsername = "....",
SaslPassword = "....",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
GroupId = "kafka-dotnet-getting-started",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
const string topic = "purchases";
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
Console.WriteLine($"Application start time: {DateTime.Now}");
using (var consumer = new ConsumerBuilder<string, string>(config).Build())
{
consumer.Subscribe(topic);
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed at {DateTime.Now} event from topic {topic}: key = {cr.Message.Key,-10} value = {cr.Message.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
}
Certifique-se de usar corretamente consumer.close(). Ele é usado para fechar a conexão de rede e acionar o reequilíbrio imediatamente, o que ajuda o aplicativo a receber mensagens imediatamente após a reinicialização.