我使用的是 JDK 17 和 Apache ActiveMQ Artemis 2.38.0。我有以下简单的示例用于将消息发送到主题,但消息从未到达该主题的消费者。ActiveMQ Artemis 或应用程序端没有错误或任何其他日志。
@Configuration
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true); // Enable Pub/Sub for Topics
return factory;
}
}
休息控制器
import com.codeforgeyt.artemisdemo.service.DispatcherService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
DispatcherService dispatcherService;
@PostMapping(value = "/queue")
public ResponseEntity<String> sendToQueue(@RequestBody String message){
dispatcherService.sendMessage(message);
return new ResponseEntity<>("Message sent: " + message, HttpStatus.OK);
}
@PostMapping(value = "/topic")
public ResponseEntity<String> sendToTopic(@RequestBody String message){
dispatcherService.sendTopic(message);
return new ResponseEntity<>("Message sent to topic: " + message, HttpStatus.OK);
}
}
我的服务
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class DispatcherService {
@Autowired
JmsTemplate jmsTemplate;
@Value("${jms.queue}")
private String jmsQueue;
@Value("${jms.topic}")
private String jmsTopic;
Logger log = LoggerFactory.getLogger(this.getClass());
public void sendMessage(String message){
jmsTemplate.setPubSubDomain(false);
jmsTemplate.convertAndSend(jmsQueue, message);
log.info("Message sent To Queue.");
}
public void sendTopic(String message){
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(jmsTopic, message);
log.info("Message sent To Topic.");
}
}
接收者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class ReceiverService {
Logger log = LoggerFactory.getLogger(ReceiverService.class);
@JmsListener(destination = "${jms.queue}", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(String message){
log.info("Received Queue message: " + message);
}
@JmsListener(destination = "${jms.consumer1}", containerFactory = "jmsListenerContainerFactory")
public void receiveTopicA(String topic){
log.info("Received topic Consumer.A Message: " + topic);
}
@JmsListener(destination = "${jms.consumer2}", containerFactory = "jmsListenerContainerFactory")
public void receiveTopicB(String topic){
log.info("Received topic Consumer.B Message: " + topic);
}
}
Springboot 启动器
@SpringBootApplication
public class ArtemisDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ArtemisDemoApplication.class, args);
}
}
以下是我的application.properties
:
spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=admin
spring.artemis.password=test
jms.queue=Q.Test
jms.topic=VirtualTopic.OrderUpdates
jms.consumer1=Consumer.ClientA.VirtualTopic.OrderUpdates
jms.consumer2=Consumer.ClientB.VirtualTopic.OrderUpdates