Atualmente estou implementando dentro de um projeto Quarkus um consumidor JMS que escuta uma fila anycast ("my.edu.queue") no ActiveMQ Artemis (versão 2.32.0). O que quero testar é como a mensagem é entregue na fila de mensagens mortas se, digamos, ocorrer algum erro de negócios. Simulei o erro de negócios para cada quinta mensagem (basta lançar exceção).
Pela análise do código dos exemplos do ActiveMQ Artemis ( exemplo de mensagens mortas ), o código para letras mortas parece simples. Implementei a funcionalidade da mesma forma que no exemplo (como pensei), exceto que fiz isso com a biblioteca "quarkus-artemis-jms". O problema com o código ou talvez com a configuração do meu corretor é que eu não recebo mensagens dentro da fila de mensagens mortas quando o faço Session.rollback()
. O código da minha JmsMessageConsumer
aparência segue se alguém conseguir adivinhar à primeira vista o que estou fazendo de errado. Também configurei o max-delivery-attempts
broker.xml interno como 0 para minha fila (adicionei um link para meu projeto onde todos os arquivos estão lá).
@ApplicationScoped
public class JmsMessageConsumer {
private static final Logger log = Logger.getLogger(JmsMessageConsumer.class.getName());
@Inject
ConnectionFactory connectionFactory;
private JMSContext context;
private Connection connection;
private Session session;
private MessageConsumer consumer;
private MessageProducer dlqProducer;
@ConfigProperty(name = "my.edu.queue.name", defaultValue = "my.edu.queue")
String queueName;
private AtomicInteger counter = new AtomicInteger(0);
void onStart(@Observes StartupEvent ev) throws JMSException {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue);
Queue dlq = session.createQueue(queueName + ".dlq");
dlqProducer = session.createProducer(dlq);
connection.start();
receiveMessages();
}
void onStop(@Observes ShutdownEvent ev) throws JMSException {
connection.close();
}
private void receiveMessages() {
try {
consumer.setMessageListener(message -> {
try {
processMessage(message.getBody(String.class));
session.commit();
} catch (Exception e) {
log.severe("Error processing message: %s".formatted(e.getMessage()));
try {
// sendToDLQ(message);
session.rollback();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
});
} catch (JMSException e) {
log.severe("Error setting message listener: %s".formatted(e.getMessage()));
}
}
private void processMessage(String text) {
counter.incrementAndGet();
if (counter.get() % 5 == 0) {
throw new RuntimeException("Error in business logic");
}
log.info("Processed message: " + text);
}
}
O projeto completo (que é bem simples) com todos os arquivos de configuração pode ser encontrado aqui . O meu broker.xml
pode ser encontrado dentro src/resources/broker_conf/broker.xml
.