AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79564890
Accepted
Gord
Gord
Asked: 2025-04-10 00:45:16 +0800 CST2025-04-10 00:45:16 +0800 CST 2025-04-10 00:45:16 +0800 CST

Consumindo uma mensagem grande do ActiveMQ Artemis via STOMP + Python

  • 772

Estou tentando consumir uma mensagem grande de um servidor ActiveMQ Artemis embarcado com um cliente Python que está se conectando usando STOMP. O cliente cria uma fila temporária e envia uma mensagem para um endereço de "solicitação". O servidor deve gerar uma resposta e enviá-la de volta para a fila temporária. Nesse caso, a resposta tem aproximadamente 2,8 milhões de JSON.

Quando o servidor tenta responder, recebo esta mensagem:

info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601501: User [email protected]:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp   AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]] 
java.lang.IndexOutOfBoundsException null
    at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
    at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
    at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
    at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
    at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
    at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)

Meu entendimento (provavelmente incorreto) é que o ActiveMQ Artemis deveria dividir isso em pedaços menores, mas ele está apenas me dando uma exceção.

Devo estar esquecendo alguma coisa (talvez uma configuração). Alguém pode me dizer o que é?

Estou usando o Artemis 2.40.0. Estou serializando meu objeto em JSON e enviando-o como uma mensagem de TEXTO:

Aqui está um pequeno programa que exibe esse comportamento:

public class Example {
    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:example.xml");
        broker.start();

        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
        ClientSession session = clientSessionFactory.createSession();
        session.start();

        ClientConsumer consumer = session.createConsumer("json.requests");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage message) {
                byte[] bytes = new byte[message.getBodySize()];
                message.getBodyBuffer().readBytes(bytes);
                JsonElement jsonElement = JsonParser.parseString(new String(bytes));
                String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
                String s = generateLargeMessage();
                try {
                    ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
                    ClientMessage m = session.createMessage(TEXT_TYPE, false);
                    m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
                    producer.send(m);
                    session.commit();
                } catch (ActiveMQException e) {
                    System.err.println("Couldn't send message: " + e.getMessage());
                }
            }
        });
    }

    private static String generateLargeMessage() {
        StringBuilder sb = new StringBuilder();
        while (sb.length() < 2000000) {
            sb.append("abcdefghijklmnopqrstuvwxyz\n");
        }
        return sb.toString();
    }
}

Aqui está meu xml:

<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
            <acceptor name="in-tcp">tcp://localhost:61616</acceptor>
        </acceptors>

        <addresses>
            <address name="json.requests">
                <anycast>
                    <queue name="json.requests"/>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

Aqui está meu teste python:

#!venv/bin/python3

import time
import stomp
import json
import uuid

myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")

conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())

conn.connect(None, None, wait=True)

print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')

msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')

resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)
activemq-artemis
  • 1 1 respostas
  • 45 Views

1 respostas

  • Voted
  1. Best Answer
    Justin Bertram
    2025-04-10T03:54:29+08:002025-04-10T03:54:29+08:00

    O protocolo STOMP não oferece suporte a corpos de mensagens fragmentados como o protocolo Core, portanto, seu entendimento de que o broker dividirá a mensagem em fragmentos nesse contexto está incorreto.

    No entanto, é possível consumir mensagens "grandes" usando o protocolo STOMP. O broker simplesmente lerá todo o corpo da mensagem do disco para a memória antes de enviá-la pela rede para o consumidor. Existem vários testes no conjunto de testes que utilizam essa funcionalidade, e todos eles foram aprovados na versão 2.40.0. Dito isso, você está fazendo algo diferente do que está sendo feito nos testes. Você desabilitou a persistência no seu broker.xml, por exemplo:

    <persistence-enabled>false</persistence-enabled>
    

    Se você habilitar a persistência, seu código funcionará, por exemplo:

    <persistence-enabled>true</persistence-enabled>
    

    Outra maneira de contornar o problema é aumentar o tempo minLargeMessageSizepara o cliente enviar a resposta, para que o corretor não trate essas mensagens como "grandes", por exemplo:

    ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
    locator.setMinLargeMessageSize(6291456);
    

    Ainda estou investigando exatamente por que isso não funciona com a persistência desabilitada.

    • 1

relate perguntas

  • Segurando mensagens por 10 segundos no ActiveMQ Artemis

  • Simulando um transporte duplex Clássico com federação em Artemis?

  • Propriedades impressas do filtro de log de auditoria do ActiveMQ Artemis

  • Conexão do corretor ActiveMQ Artemis

  • Implementando ferramenta de monitoramento Grafana para ActiveMQ Artemis

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve