AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题

问题[activemq-artemis](coding)

Martin Hope
Gord
Asked: 2025-04-10 00:45:16 +0800 CST

通过 STOMP + Python 从 ActiveMQ Artemis 消费大消息

  • 5

我正在尝试使用一个使用 STOMP 连接的 Python 客户端来处理来自嵌入式 ActiveMQ Artemis 服务器的一条大消息。客户端创建一个临时队列,然后向“请求”地址发送一条消息。服务器应该生成一个回复,并将其发送回临时队列。在本例中,回复大约是 2.8M 的 JSON 数据。

当服务器尝试回复时,我收到此消息:

info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message   AMQ601501: User [email protected]:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp   AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]] 
java.lang.IndexOutOfBoundsException null
    at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
    at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
    at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
    at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
    at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
    at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
    at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
    at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)

我的理解(可能不正确)是 ActiveMQ Artemis 应该将其分解成更小的块,但它只是给我一个异常。

我肯定漏掉了什么(可能是配置)。有人能告诉我是什么吗?

我正在使用 Artemis 2.40.0。我将对象序列化为 JSON,然后以文本消息的形式发送:

下面是一个表现出这种行为的小程序:

public class Example {
    public static void main(String[] args) throws Exception {
        EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
        broker.setConfigResourcePath("file:example.xml");
        broker.start();

        ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
        ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
        ClientSession session = clientSessionFactory.createSession();
        session.start();

        ClientConsumer consumer = session.createConsumer("json.requests");
        consumer.setMessageHandler(new MessageHandler() {
            @Override
            public void onMessage(ClientMessage message) {
                byte[] bytes = new byte[message.getBodySize()];
                message.getBodyBuffer().readBytes(bytes);
                JsonElement jsonElement = JsonParser.parseString(new String(bytes));
                String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
                String s = generateLargeMessage();
                try {
                    ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
                    ClientMessage m = session.createMessage(TEXT_TYPE, false);
                    m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
                    producer.send(m);
                    session.commit();
                } catch (ActiveMQException e) {
                    System.err.println("Couldn't send message: " + e.getMessage());
                }
            }
        });
    }

    private static String generateLargeMessage() {
        StringBuilder sb = new StringBuilder();
        while (sb.length() < 2000000) {
            sb.append("abcdefghijklmnopqrstuvwxyz\n");
        }
        return sb.toString();
    }
}

这是我的 xml:

<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
    <core xmlns="urn:activemq:core">
        <persistence-enabled>false</persistence-enabled>
        <security-enabled>false</security-enabled>
        <acceptors>
            <acceptor name="in-vm">vm://0</acceptor>
            <acceptor name="in-tcp">tcp://localhost:61616</acceptor>
        </acceptors>

        <addresses>
            <address name="json.requests">
                <anycast>
                    <queue name="json.requests"/>
                </anycast>
            </address>
        </addresses>
    </core>
</configuration>

这是我的测试python:

#!venv/bin/python3

import time
import stomp
import json
import uuid

myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")

conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())

conn.connect(None, None, wait=True)

print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')

msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')

resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)
activemq-artemis
  • 1 个回答
  • 45 Views
Martin Hope
Sergio Scaramuzzi
Asked: 2024-12-15 04:26:07 +0800 CST

无法使用带有 JMS 的管理 API 来更改地址设置

  • 6

我对 JMS 和 Artemis 的世界还很陌生,我正在努力利用管理 API 将 JMS 消息发送到我的独立代理。

我使用的是 Artemis 2.38

我想在不使用 broker.xml 的情况下更新地址设置,但我一直只得到 null 作为回复,而且当我去测试它们时设置显然没有改变。

我按照这里的类似帖子做了,并想出了这段代码。这里

try (JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE)) {
            //You MUST explicitly create this queue for management operations.
            Queue managementQueue = context.createQueue("activemq.management");

            Message message = context.createMessage();
            JMSManagementHelper.putOperationInvocation(
                message,
                ResourceNames.BROKER,    // Targetted Resource
                "addAddressSettings",    // Operation to invoke
                "ri.trips1",             // Address match
                null,                    // Dead letter address (DLA)
                null,                    // Expiry address
                -1L,                     // Expiry delay
                true,                    // Last value queue
                7,                       // Max delivery attempts
                -1L,                     // Max size bytes
                0,                       // Page size bytes
                -1,                      // Page max cache size
                1000L,                   // Redelivery delay
                1.0,                     // Redelivery multiplier
                -1L,                     // Max redelivery delay
                -1L,                     // Slow consumer threshold
                false,                   // Slow consumer policy
                null,                    // Slow consumer notification interval
                -1L,                     // Min large message size
                -1L,                     // Consumer window size
                null,                    // Auto-create queues
                true,                    // Auto-create addresses
                false,                   // Auto-delete queues
                false,                   // Auto-delete addresses
                true                     // Auto-delete created queues
            );
            context.createProducer().send(managementQueue, message);

            Message response = context.createConsumer(managementQueue).receive(5000);
            if (response != null && JMSManagementHelper.hasOperationSucceeded(response)) {
                System.out.println("Address settings applied successfully.");
            } else {
                System.err.println("Failed to apply address settings");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

此处的代码块始终解析为失败,因为响应始终为空。但是它可以正常创建地址“activemq.management”。

我使用的是 Artemis 2.38

任何帮助都将不胜感激!

activemq-artemis
  • 1 个回答
  • 16 Views
Martin Hope
d-man
Asked: 2024-12-02 04:03:50 +0800 CST

ActiveMQ Artemis JMS 主题消息不会传递给消费者

  • 6

我使用的是 JDK 17 和 Apache ActiveMQ Artemis 2.38.0。我有以下简单的示例用于将消息发送到主题,但消息从未到达该主题的消费者。ActiveMQ Artemis 或应用程序端没有错误或任何其他日志。

@Configuration
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true); // Enable Pub/Sub for Topics
        return factory;
    }
}

休息控制器

import com.codeforgeyt.artemisdemo.service.DispatcherService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    DispatcherService dispatcherService;

    @PostMapping(value = "/queue")
    public ResponseEntity<String> sendToQueue(@RequestBody String message){
        dispatcherService.sendMessage(message);
        return new ResponseEntity<>("Message sent: " + message, HttpStatus.OK);
    }

    @PostMapping(value = "/topic")
    public ResponseEntity<String> sendToTopic(@RequestBody String message){
        dispatcherService.sendTopic(message);
        return new ResponseEntity<>("Message sent to topic: " + message, HttpStatus.OK);
    }
}

我的服务

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
public class DispatcherService {

    @Autowired
    JmsTemplate jmsTemplate;

    @Value("${jms.queue}")
    private String jmsQueue;
    @Value("${jms.topic}")
    private String jmsTopic;
    Logger log = LoggerFactory.getLogger(this.getClass());

    public void sendMessage(String message){
        jmsTemplate.setPubSubDomain(false);
        jmsTemplate.convertAndSend(jmsQueue, message);
        log.info("Message sent To Queue.");
    }
    public void sendTopic(String message){
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend(jmsTopic, message);
        log.info("Message sent To Topic.");
    }
}

接收者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

@Service
public class ReceiverService {

    Logger log = LoggerFactory.getLogger(ReceiverService.class);

    @JmsListener(destination = "${jms.queue}", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(String message){
        log.info("Received Queue message: " + message);
    }

    @JmsListener(destination = "${jms.consumer1}", containerFactory = "jmsListenerContainerFactory")
    public void receiveTopicA(String topic){
        log.info("Received topic Consumer.A Message: " + topic);
    }
    @JmsListener(destination = "${jms.consumer2}", containerFactory = "jmsListenerContainerFactory")
    public void receiveTopicB(String topic){
        log.info("Received topic Consumer.B Message: " + topic);
    }
}

Springboot 启动器

@SpringBootApplication
public class ArtemisDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArtemisDemoApplication.class, args);
    }

}

以下是我的application.properties:

spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=admin
spring.artemis.password=test

jms.queue=Q.Test
jms.topic=VirtualTopic.OrderUpdates
jms.consumer1=Consumer.ClientA.VirtualTopic.OrderUpdates
jms.consumer2=Consumer.ClientB.VirtualTopic.OrderUpdates
activemq-artemis
  • 1 个回答
  • 24 Views
Martin Hope
burki
Asked: 2024-11-28 19:12:31 +0800 CST

如何在 Kubernetes Operator 部署中删除 Artemis 的默认 DLQ,以避免 DLQ 中出现重复消息

  • 6

我已经配置了一个基于 Kubernetes 操作器的 Artemis 部署来

  1. 每个队列使用带有前缀“DLQ”的单独 DLQ。
  2. 自动创建 deadLetter 资源

这很好。如果消息超出了最大投递尝试次数,它将被移至其队列的 DLQ。

然而,在 Artemis 中有一个DLQ默认地址,其中包含一个DLQ默认队列。所有自动创建的特定 DLQ 都添加到此DLQ地址。因此它包含名为 的默认队列DLQ和任意数量的名为 的特定 DLQ DLQ.[queuename]。

特定的 DLQ 是使用过滤器自动创建的,该过滤器仅包含以特定队列为来源的消息,例如_AMQ_ORIG_ADDRESS = 'testqueue'。到目前为止一切顺利。

但我遇到的问题是,来自的失败消息testqueue不仅被移动到DLQ.testqueue,而且还被复制到DLQ。同一条消息在 2 个不同的 DLQ 中。这是合理的,因为消息被发送到不排除它的地址的每个队列,并且由于默认 DLQ 没有过滤器,它会接收所有消息。

一开始我以为这只是 2 个队列中可见的 1 条消息,但无论我删除DLQ或中的消息DLQ.testqueue,它都不会从另一个队列中删除。这实际上是 2 条消息。

现在我认为我必须删除在 broker.xml 中配置的 Artemis 的默认 DLQ。

但是我该怎么做呢?操作员将默认配置与我的配置合并,我看不出有什么明显的方法可以用任何东西“覆盖”默认地址定义。

我可以添加类型的 CRActiveMQArtemisAddress来创建地址和队列,但我不知道如何删除默认地址。

感谢您的帮助。

activemq-artemis
  • 1 个回答
  • 19 Views
Martin Hope
Gampa Sivakumar
Asked: 2024-10-29 18:57:26 +0800 CST

在主服务器关闭后,是否有任何方法可以在不更改 broker.xml 的情况下将主服务器作为备份服务器运行?

  • 5

我在 HA 配置中使用 2 个集群 ActiveMQ Artemis 节点。当主节点关闭时,备份节点已激活,并且主节点重新启动,我希望主节点充当备份节点。这可能吗?

我正在使用ha-policy false场景,但我不需要使用true场景。

      <ha-policy>
         <shared-store>
             <primary>
                <failover-on-shutdown>false</failover-on-shutdown>
             </primary>
         </shared-store>
      </ha-policy>
activemq-artemis
  • 1 个回答
  • 21 Views
Martin Hope
Frank
Asked: 2024-09-13 03:18:57 +0800 CST

ActiveMQ Artemis 2.37.0 消息和 New Relic

  • 4

从 Web 控制台浏览我的 ActiveMQ Artemis 2.37.0 消息时,我看到一个newrelic属性。知道这是怎么回事吗?

[{"newrelic":"eyJkIjp7ImFjIjoiMTIxMjEwMiIsInByIjoxLjQ3MTk0LCJ0","x-opt-delivery-time":1726171200241,"x-opt-jms-dest":0,"address":"scheduled","creationTime":1726168127157,"x-opt-jms-msg-type":3,"messageID":515,"priority":4,"userID":"ID:ID:eb421cec-9cfa-49b3-b730-cf95485863da:1:1:1-1",,"durable":true,"tracestate":"1212102@nr=0-0-1212102-398731796-08b44ca926d53eec-b3094d0d0d351a97,","traceparent":"00-3ea7d1cdf9df58945f523cf2acc6f21f","expiration":0,"to":"scheduled","contentType":"application/octet-stream","timestamp":1726168127157}]
activemq-artemis
  • 1 个回答
  • 17 Views
Martin Hope
Albaasith
Asked: 2024-06-27 22:41:58 +0800 CST

在 ActiveMQ Artemis 中保留消息 10 秒

  • 5

我使用 WebSphere 作为 Web 服务器和应用服务器。Web 服务器使用 JMS 连接到 ActiveMQ Artemis,而应用服务器使用资源适配器连接到 ActiveMQ Artemis。请求进入 Web 服务器后,会发送到 ActiveMQ Artemis 队列,然后请求会发送给消费者。但是,我希望将该消息保留 10 秒钟,然后再将其发送给消费者。

是否有可能在 ActiveMQ Artemis 中实现这一点?

以下是我的一些broker.xml:

  <address-settings>
    <address-setting match="#">
      <dead-letter-address>DLQ</dead-letter-address>
      <expiry-address>ExpiryQueue</expiry-address>
      <redelivery-delay>0</redelivery-delay>
      <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
      <max-size-bytes>-1</max-size-bytes>
      <message-counter-history-day-limit>10</message-counter-history-day-limit>
      <address-full-policy>PAGE</address-full-policy>
      <auto-create-queues>true</auto-create-queues>
      <auto-create-addresses>true</auto-create-addresses>
      <auto-create-jms-queues>true</auto-create-jms-queues>
      <auto-create-jms-topics>true</auto-create-jms-topics>
      <auto-delete-queues>false</auto-delete-queues>
      <auto-delete-addresses>false</auto-delete-addresses>
      <auto-delete-jms-queues>false</auto-delete-jms-queues>
      <auto-delete-jms-topics>false</auto-delete-jms-topics>
      <expiry-delay>60000</expiry-delay>
    </address-setting>
activemq-artemis
  • 1 个回答
  • 27 Views
Martin Hope
rb612
Asked: 2024-03-02 00:21:00 +0800 CST

在 Artemis 中模拟经典双工传输?

  • 5

我正在尝试从 ActiveMQ Classic 迁移到 ActiveMQ Artemis,并且我在 Classic 中设置了双工 TCP 连接器。

我尝试设置联合配置,并且发现多播地址已正确中继。我没有看到任播地址被发送副本。我可以理解为什么考虑到选播的定义,这可能是预期的,但我希望能够重现与经典中相同的语义(其中我有一个转发其所有消息的经典队列)。

为了更清楚起见,我有两个代理,A 和 B,并且代理 B(辅助)配置了 A 的上游,以及 A 的下游和 B 的上游引用。我有一个匹配所有地址的地址策略这。代理 A 没有联合配置。

我观察到在 A 上创建的多播地址似乎在 B 上复制,这表明从 B 到 A 的上游似乎有效。但是任播(创建的 JMS 队列而不是主题)似乎没有在 B 上创建。我希望这能像 Classic 中匹配所有队列/主题的双工一样工作。

我的期望是,发送到代理 A 队列的每条消息都会被代理 B 上按名称订阅同一队列的人接收。换句话说,我希望通过在 A 上生成并在 B 上消费,我将获得相同的消息,就好像我在 A 上消费。我实际上希望 A 上发布的任何消息都转发到 B,反之亦然。

我看到文档指出:

联合地址可以将从上游地址发布的消息复制到本地地址。nb 仅多播地址支持此功能。

activemq-artemis
  • 1 个回答
  • 31 Views
Martin Hope
la00
Asked: 2024-02-23 00:48:25 +0800 CST

ActiveMQ Artemis 审核日志过滤器打印属性

  • 5

我已按照文档中的说明激活审核日志。

由于我有一些详细的属性,我想知道是否有一种方法可以在打印到审核日志时仅过滤选定的属性。

我查看了源代码,似乎使用了CoreMessage 的toString方法,但似乎不可能。

activemq-artemis
  • 1 个回答
  • 13 Views
Martin Hope
Frank
Asked: 2024-01-16 05:01:50 +0800 CST

ActiveMQ Artemis 代理连接

  • 5

我有一个非常简单的 Spring Boot 2.7.6 ActiveMQ Artemis 应用程序,用于侦听消息。

package broker.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.apache.activemq.broker.BrokerService;
import org.springframework.jms.annotation.JmsListener;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.setPersistent(false);
        broker.start();
        return broker;
    }

    @JmsListener(destination = "foo")
    public void listen(String in) {
        System.out.println(in);
    }
}

然后我有另一个 Spring Boot 应用程序,它生成消息并将它们发送到代理地址。

package broker.producer;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.JndiDestinationResolver;
import org.springframework.stereotype.Service;

@Service
public class JmsProducer {

  @Value("${spring.artemis.broker-url}")
  private String brokerUrl;

  @Value("${spring.jms.template.default-destination}")
  private String defaultDestination;

  Logger log = LoggerFactory.getLogger(JmsProducer.class);

  @Bean
  public ActiveMQConnectionFactory activeMQConnectionFactory() {
      log.info("BrokerUrl: {}", brokerUrl);
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);

      return activeMQConnectionFactory;
  }

  @Bean
  public JndiDestinationResolver jndiDestinationResolver() {
    return new JndiDestinationResolver();
  }

  @Bean
  public JmsTemplate jmsTemplate() {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(activeMQConnectionFactory());
    template.setPubSubDomain(false); // false for a Queue, true for a Topic
    template.setDefaultDestinationName(defaultDestination);

    return template;
  }


  public void send(String message) {
    JmsTemplate jmsTemplate = jmsTemplate();

    log.info("Sending message='{}'", message);
    jmsTemplate.convertAndSend(message);
    log.info("Sent message='{}'", message);
  }
}

这application.properties对于两个应用程序来说都很简单。

spring.artemis.mode=EMBEDDED
spring.artemis.broker-url=tcp://localhost:61616
spring.artemis.user=admin
spring.artemis.password=secret
spring.artemis.embedded.enabled=true

spring.jms.template.default-destination=my-queue-1

然后我启动每个应用程序并尝试调用send方法,但我不断从生产者应用程序收到此错误。

2024-01-15 15:50:28.462 ERROR 1012 --- [-netty-threads)] org.apache.activemq.artemis.core.client : AMQ214013: Failed to decode packet
java.lang.IllegalArgumentException: AMQ219032: Invalid type: 1
        at org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder.decode(PacketDecoder.java:499) ~[artemis-core-client-2.19.1.jar:2.19.1]

消费者应用程序显示此错误消息:

2024-01-15 15:50:28.464  WARN 986 --- [0.1:57714@61616] o.a.a.b.TransportConnection.Transport : Transport Connection to: tcp://127.0.0.1:57714 failed: Unknown data type: 77
activemq-artemis
  • 1 个回答
  • 30 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve