我正在尝试使用一个使用 STOMP 连接的 Python 客户端来处理来自嵌入式 ActiveMQ Artemis 服务器的一条大消息。客户端创建一个临时队列,然后向“请求”地址发送一条消息。服务器应该生成一个回复,并将其发送回临时队列。在本例中,回复大约是 2.8M 的 JSON 数据。
当服务器尝试回复时,我收到此消息:
info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message AMQ601501: User [email protected]:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
java.lang.IndexOutOfBoundsException null
at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)
我的理解(可能不正确)是 ActiveMQ Artemis 应该将其分解成更小的块,但它只是给我一个异常。
我肯定漏掉了什么(可能是配置)。有人能告诉我是什么吗?
我正在使用 Artemis 2.40.0。我将对象序列化为 JSON,然后以文本消息的形式发送:
下面是一个表现出这种行为的小程序:
public class Example {
public static void main(String[] args) throws Exception {
EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
broker.setConfigResourcePath("file:example.xml");
broker.start();
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
ClientSession session = clientSessionFactory.createSession();
session.start();
ClientConsumer consumer = session.createConsumer("json.requests");
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
byte[] bytes = new byte[message.getBodySize()];
message.getBodyBuffer().readBytes(bytes);
JsonElement jsonElement = JsonParser.parseString(new String(bytes));
String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
String s = generateLargeMessage();
try {
ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
ClientMessage m = session.createMessage(TEXT_TYPE, false);
m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
producer.send(m);
session.commit();
} catch (ActiveMQException e) {
System.err.println("Couldn't send message: " + e.getMessage());
}
}
});
}
private static String generateLargeMessage() {
StringBuilder sb = new StringBuilder();
while (sb.length() < 2000000) {
sb.append("abcdefghijklmnopqrstuvwxyz\n");
}
return sb.toString();
}
}
这是我的 xml:
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
<acceptor name="in-tcp">tcp://localhost:61616</acceptor>
</acceptors>
<addresses>
<address name="json.requests">
<anycast>
<queue name="json.requests"/>
</anycast>
</address>
</addresses>
</core>
</configuration>
这是我的测试python:
#!venv/bin/python3
import time
import stomp
import json
import uuid
myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")
conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())
conn.connect(None, None, wait=True)
print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')
msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')
resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)