我使用Apache ActiveMQ Artemis 2.40.0作为代理,使用Qpid Proton Python 0.39.0 ( python-qpid-proton
) 作为 AMQP 1.0 客户端。
目标是max-delivery-attempts
当消费者明确不确认(NACK) 消息时,使用release(delivered=True)
Proton 的方法重新传递消息,直至达到配置的限制 () MessagingHandler
。
我broker.xml
在 Artemis 中配置了以下<address-setting>
块:
<address-setting match="#">
<expiry-address>ExpiryQueue</expiry-address>
<dead-letter-address>DLQ</dead-letter-address>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix></dead-letter-queue-prefix>
<dead-letter-queue-suffix>.DLQ</dead-letter-queue-suffix>
<max-delivery-attempts>5</max-delivery-attempts>
<redelivery-delay>5000</redelivery-delay>
</address-setting>
Python代码:
on_message
以下是我的 Qpid Proton 消费者中方法的片段:
def on_message(self, event):
message = event.message
status = send_message_callback(self.enrollment["target_url"], message.body)
if 200 <= status < 300:
self.accept(event.delivery)
else:
# Explicitly NACK the message
self.release(event.delivery, delivered=True)
尽管使用,Artemis 似乎会无限期地release(delivered=True)
重新传递消息(忽略),除非我切换到,它会在第一次故障时立即将消息发送到 DLQ。<max-delivery-attempts>
reject(event.delivery)
我也尝试过将标签添加<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
到broker.xml
,但没有成功。
预期行为:
之后
release(delivered=True)
,Artemis 应该:- 计算一次重新投递尝试
max-delivery-attempts
根据和重试消息redelivery-delay
- 5 次尝试失败后将其发送到 DLQ
release(delivered=True)
应表现为软 NACK,如 Proton文档中所述
问题:
Artemis不断重复发送消息,从不计入max-delivery-attempts
。看来 Proton 并未像预期的那样持久化或确认发送次数。
我尝试过的:
- 显式
release(delivery, delivered=True)
- 使用
reject()
(有效,但不是我想要的行为) - 添加
<persist-delivery-count-before-delivery>
到代理配置(破坏 Artemis 2.40.0 中的模式) - 已验证 Proton 的
on_released
、on_settled
、on_rejected
未在release(delivered=True)
问题:
- Artemis 2.40.0是否
release(delivered=True)
支持增加交付次数? - 在 Proton Python 中是否有正确/更好的方法来 NACK,以便 Artemis 跟踪重新交付?
- 是否有替代代理或 Proton 配置来支持此重试逻辑?
提前致谢!
解决:
在发布消息之前,我没有更新本地状态对象。因此,为了使用在 broker.xml 中定义的max-delivery-attempts
策略,它必须完成以下本地属性:DLQ
def on_message(self, event):
message = event.message
status = send_message_callback(self.enrollment["target_url"], message.body)
if 200 <= status < 300:
self.accept(event.delivery)
else:
# Explicitly NACK the message
local_state = event.delivery.local
local_state.failed = True
local_state.undeliverable = False
event.delivery.update(local_state.type)
self.settle(event.delivery, event.delivery.MODIFIED)
您需要花时间阅读规范并了解目标和源头的各种配置的含义。
“已释放”表示源应将消息视为尚未处理并将其返回到初始状态,“已拒绝”表示消息被视为无效并应进入终止状态。记录失败尝试的正确结果是“已修改”配置,它提供了一个传递失败标志,表示传递计数应该增加,这是服务器在失败尝试次数足够多后最终 DLQ 消息时要使用的。