AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 79564890
Accepted
Gord
Gord
Asked: 2025-04-10 00:45:16 +0800 CST2025-04-10 00:45:16 +0800 CST 2025-04-10 00:45:16 +0800 CST

通过 STOMP + Python 从 ActiveMQ Artemis 消费大消息

  • 772

我正在尝试使用一个使用 STOMP 连接的 Python 客户端来处理来自嵌入式 ActiveMQ Artemis 服务器的一条大消息。客户端创建一个临时队列,然后向“请求”地址发送一条消息。服务器应该生成一个回复,并将其发送回临时队列。在本例中,回复大约是 2.8M 的 JSON 数据。

当服务器尝试回复时,我收到此消息:

info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601501: User [email protected]:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp   AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]] 
java.lang.IndexOutOfBoundsException null
    at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
    at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
    at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
    at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
    at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
    at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)

我的理解(可能不正确)是 ActiveMQ Artemis 应该将其分解成更小的块,但它只是给我一个异常。

我肯定漏掉了什么(可能是配置)。有人能告诉我是什么吗?

我正在使用 Artemis 2.40.0。我将对象序列化为 JSON,然后以文本消息的形式发送:

下面是一个表现出这种行为的小程序:

public class Example {
    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:example.xml");
        broker.start();

        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
        ClientSession session = clientSessionFactory.createSession();
        session.start();

        ClientConsumer consumer = session.createConsumer("json.requests");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage message) {
                byte[] bytes = new byte[message.getBodySize()];
                message.getBodyBuffer().readBytes(bytes);
                JsonElement jsonElement = JsonParser.parseString(new String(bytes));
                String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
                String s = generateLargeMessage();
                try {
                    ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
                    ClientMessage m = session.createMessage(TEXT_TYPE, false);
                    m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
                    producer.send(m);
                    session.commit();
                } catch (ActiveMQException e) {
                    System.err.println("Couldn't send message: " + e.getMessage());
                }
            }
        });
    }

    private static String generateLargeMessage() {
        StringBuilder sb = new StringBuilder();
        while (sb.length() < 2000000) {
            sb.append("abcdefghijklmnopqrstuvwxyz\n");
        }
        return sb.toString();
    }
}

这是我的 xml:

<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
            <acceptor name="in-tcp">tcp://localhost:61616</acceptor>
        </acceptors>

        <addresses>
            <address name="json.requests">
                <anycast>
                    <queue name="json.requests"/>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

这是我的测试python:

#!venv/bin/python3

import time
import stomp
import json
import uuid

myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")

conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())

conn.connect(None, None, wait=True)

print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')

msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')

resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)
activemq-artemis
  • 1 1 个回答
  • 45 Views

1 个回答

  • Voted
  1. Best Answer
    Justin Bertram
    2025-04-10T03:54:29+08:002025-04-10T03:54:29+08:00

    STOMP 协议不像核心协议那样支持分块消息体,因此您认为代理会在此上下文中将消息分成块的理解是不正确的。

    但是,可以使用 STOMP 协议消费“大”消息。代理会将整个消息主体从磁盘读取到内存中,然后再通过网络发送给消费者。测试套件中有几个测试测试了此功能,所有这些测试都在 2.40.0 版本上通过了。也就是说,您执行的操作与测试中执行的操作不同。您已在 中禁用了持久化功能broker.xml,例如:

    <persistence-enabled>false</persistence-enabled>
    

    如果您启用持久性,那么您的代码就可以工作,例如:

    <persistence-enabled>true</persistence-enabled>
    

    解决这个问题的另一种方法是增加minLargeMessageSize发送响应的客户端,以便代理不会将这些消息视为“大消息”,例如:

    ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
    locator.setMinLargeMessageSize(6291456);
    

    我仍在调查为什么在禁用持久性的情况下这不起作用。

    • 1

相关问题

  • 在 ActiveMQ Artemis 中保留消息 10 秒

  • 在 Artemis 中模拟经典双工传输?

  • ActiveMQ Artemis 审核日志过滤器打印属性

  • ActiveMQ Artemis 代理连接

  • 为ActiveMQ Artemis实现Grafana监控工具

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve