AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / user-5737088

Draaksward's questions

Martin Hope
Draaksward
Asked: 2025-02-19 22:47:25 +0800 CST

Spring Integrations。向 MQTT 发送消息

  • 6

作为学习曲线的一部分,我正在尝试使用 Spring Integrations 向 Mosquitto MQTT 发布一条消息。我已经设置了一个配置(主要来自我通过谷歌搜索找到的部分,但 Listener 部分运行良好):

@Configuration
@RequiredArgsConstructor
@IntegrationComponentScan
public class MqttConfig {
private final MqttProperties properties;
    private final ObjectMapper objectMapper;

    private List<String> sensorTopics;

    @PostConstruct
    void init() {
        sensorTopics = properties.getSensors().stream()
                .map(MqttProperties.Sensor::getTopic)
                .toList();
    }

    @Bean
    public IntegrationFlow sensorDataFlow(HumilityAndTempSensorMessageHandler humilityAndTempSensorMessageHandler) {
        return IntegrationFlow.from(
                        new MqttPahoMessageDrivenChannelAdapter(
                                properties.getUrl(),
                                "iotBridge",
                                "#")
                )
                .filter(getSensorTopicFilter())
                .transform(getTempSensorTransformer())
                .handle(humilityAndTempSensorMessageHandler::handleMessage)
                .get();
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("exoluse", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface PublishGateway {
        void send(String message, @Header(MqttHeaders.TOPIC) String topic);
    }

    private GenericTransformer<String, TempSensorReading> getTempSensorTransformer() {
        return source -> {
            try {
                return objectMapper.readValue(source, TempSensorReading.class);
            } catch (Exception e) {
                System.out.println("failed to resolve input");
            }
            return null;
        };
    }

    private GenericHandler<String> getSensorTopicFilter() {
        return (payload, headers) -> {
            String topic = Optional.ofNullable(headers)
                    .map(item -> item.get("mqtt_receivedTopic"))
                    .map(String::valueOf)
                    .orElse(null);
            return sensorTopics.contains(topic);
        };
    }

我正在尝试传达一条信息:

@Service
//@RequiredArgsConstructor
@Slf4j
public class SocketGateway {

    @Autowired
    private MqttConfig.PublishGateway publishGateway;

    @PostConstruct
    void t() {
        // ${SHELLY_ID}/command/switch:0 -m toggle
        String topic = "xxx/command/switch:0";
        System.out.println(">>>>>> "+publishGateway);
        publishGateway.send("toggle", topic);
    }
}

我遇到了Dispatcher has no subscribers异常。

依赖版本:

    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

我做错什么了?

UPD:完整 pom

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.2</version>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>15</maven.compiler.source>
    <maven.compiler.target>15</maven.compiler.target>
    <javafx.version>17.0.1</javafx.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.28</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.4.1</version>
    </dependency>

  </dependencies>
</project>

错误的完整堆栈跟踪:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'socketGateway': Invocation of init method failed
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:222) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1762) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942) ~[spring-context-6.0.11.jar:6.0.11]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608) ~[spring-context-6.0.11.jar:6.0.11]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-3.1.2.jar:3.1.2]
    at org.draakz.iotbridge.Application.main(Application.java:9) ~[classes/:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.mqttOutboundChannel'.
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:466) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:669) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:584) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:550) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:540) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.11.jar:6.0.11]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:244) ~[spring-aop-6.0.11.jar:6.0.11]
    at jdk.proxy2/jdk.proxy2.$Proxy83.send(Unknown Source) ~[na:na]
    at org.draakz.iotbridge.mqtt.publisher.SocketGateway.t(SocketGateway.java:26) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMethod.invoke(InitDestroyAnnotationBeanPostProcessor.java:457) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:401) ~[spring-beans-6.0.11.jar:6.0.11]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:219) ~[spring-beans-6.0.11.jar:6.0.11]
    ... 18 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.2.jar:6.1.2]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.1.2.jar:6.1.2]
    ... 42 common frames omitted
spring
  • 1 个回答
  • 29 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve