我目前正在 Quarkus 项目中实现一个 JMS 消费者,它监听 ActiveMQ Artemis(版本 2.32.0)上的任播队列(“my.edu.queue”)。我想测试的是,如果发生某些业务错误,消息将如何传递到死信队列。我模拟了每五条消息的业务错误(只是抛出异常)。
从 ActiveMQ Artemis 示例(死信示例)的代码来看,死信的代码似乎很简单。我已经按照与示例中相同的方式实现了该功能(正如我所想的那样),只是我使用“quarkus-artemis-jms”库来实现。代码或可能是我的代理设置的问题在于,当我这样做时,我没有在死信队列中收到消息。如果有人能从第一眼就猜出我做错了什么,Session.rollback()
我的代码如下。我还将队列的内部 broker.xml 设置为 0(我已将链接添加到我的项目,每个文件都在那里)。JmsMessageConsumer
max-delivery-attempts
@ApplicationScoped
public class JmsMessageConsumer {
private static final Logger log = Logger.getLogger(JmsMessageConsumer.class.getName());
@Inject
ConnectionFactory connectionFactory;
private JMSContext context;
private Connection connection;
private Session session;
private MessageConsumer consumer;
private MessageProducer dlqProducer;
@ConfigProperty(name = "my.edu.queue.name", defaultValue = "my.edu.queue")
String queueName;
private AtomicInteger counter = new AtomicInteger(0);
void onStart(@Observes StartupEvent ev) throws JMSException {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue);
Queue dlq = session.createQueue(queueName + ".dlq");
dlqProducer = session.createProducer(dlq);
connection.start();
receiveMessages();
}
void onStop(@Observes ShutdownEvent ev) throws JMSException {
connection.close();
}
private void receiveMessages() {
try {
consumer.setMessageListener(message -> {
try {
processMessage(message.getBody(String.class));
session.commit();
} catch (Exception e) {
log.severe("Error processing message: %s".formatted(e.getMessage()));
try {
// sendToDLQ(message);
session.rollback();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
});
} catch (JMSException e) {
log.severe("Error setting message listener: %s".formatted(e.getMessage()));
}
}
private void processMessage(String text) {
counter.incrementAndGet();
if (counter.get() % 5 == 0) {
throw new RuntimeException("Error in business logic");
}
log.info("Processed message: " + text);
}
}
整个项目(非常简单)及其所有配置文件可在此处找到。我的broker.xml
可在里面找到src/resources/broker_conf/broker.xml
。
由于 max-delivery-attempts 为 0,因此消息不会发送到死信地址。您必须将 max-delivery-attempts 设置为正值,才能将未送达的消息发送到死信地址,请参阅QueueImpl。
要在第一次回滚后将消息发送到死信地址,请将 max-delivery-attempts 设置为 1,即