AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / user-11638068

Gord's questions

Martin Hope
Gord
Asked: 2025-04-10 00:45:16 +0800 CST

Consumindo uma mensagem grande do ActiveMQ Artemis via STOMP + Python

  • 5

Estou tentando consumir uma mensagem grande de um servidor ActiveMQ Artemis embarcado com um cliente Python que está se conectando usando STOMP. O cliente cria uma fila temporária e envia uma mensagem para um endereço de "solicitação". O servidor deve gerar uma resposta e enviá-la de volta para a fila temporária. Nesse caso, a resposta tem aproximadamente 2,8 milhões de JSON.

Quando o servidor tenta responder, recebo esta mensagem:

info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601501: User [email protected]:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp   AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]] 
java.lang.IndexOutOfBoundsException null
    at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
    at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
    at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
    at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
    at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
    at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)

Meu entendimento (provavelmente incorreto) é que o ActiveMQ Artemis deveria dividir isso em pedaços menores, mas ele está apenas me dando uma exceção.

Devo estar esquecendo alguma coisa (talvez uma configuração). Alguém pode me dizer o que é?

Estou usando o Artemis 2.40.0. Estou serializando meu objeto em JSON e enviando-o como uma mensagem de TEXTO:

Aqui está um pequeno programa que exibe esse comportamento:

public class Example {
    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:example.xml");
        broker.start();

        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
        ClientSession session = clientSessionFactory.createSession();
        session.start();

        ClientConsumer consumer = session.createConsumer("json.requests");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage message) {
                byte[] bytes = new byte[message.getBodySize()];
                message.getBodyBuffer().readBytes(bytes);
                JsonElement jsonElement = JsonParser.parseString(new String(bytes));
                String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
                String s = generateLargeMessage();
                try {
                    ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
                    ClientMessage m = session.createMessage(TEXT_TYPE, false);
                    m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
                    producer.send(m);
                    session.commit();
                } catch (ActiveMQException e) {
                    System.err.println("Couldn't send message: " + e.getMessage());
                }
            }
        });
    }

    private static String generateLargeMessage() {
        StringBuilder sb = new StringBuilder();
        while (sb.length() < 2000000) {
            sb.append("abcdefghijklmnopqrstuvwxyz\n");
        }
        return sb.toString();
    }
}

Aqui está meu xml:

<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
            <acceptor name="in-tcp">tcp://localhost:61616</acceptor>
        </acceptors>

        <addresses>
            <address name="json.requests">
                <anycast>
                    <queue name="json.requests"/>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

Aqui está meu teste python:

#!venv/bin/python3

import time
import stomp
import json
import uuid

myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")

conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())

conn.connect(None, None, wait=True)

print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')

msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')

resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)
activemq-artemis
  • 1 respostas
  • 45 Views
Martin Hope
Gord
Asked: 2024-10-29 03:06:32 +0800 CST

Codificação diferente do ActiveMQ por conexão

  • 5

Sou novo no ActiveMQ. Estou tentando executar um broker incorporado com vários conectores. Imagino que cada conector tenha sua própria codificação (ou seja, um conector que lida com POJOs (ObjectMessage), um conector diferente que lida com json, outro com yaml, outro com xml, etc.). Imagino ainda poder enviar uma mensagem para um tópico e, então, ter essa mensagem distribuída para vários clientes, cada um codificado pela forma como eles se conectaram.

Isso é possível?

Essa visão é a maneira correta de organizar as coisas?

Tentei usar um MessageTransformer, mas ele parece ser aplicado no nível da sessão.

Uma pergunta um pouco não relacionada: devo usar o ActiveMQ clássico ou o Artemis?

activemq-classic
  • 1 respostas
  • 28 Views

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve