diff --git a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java index 01dafb5..15cce3c 100644 --- a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java +++ b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java @@ -13,7 +13,7 @@ import javax.annotation.Resource; @Slf4j //@Configuration -@EnableConfigurationProperties(MqttProperties.class) +//@EnableConfigurationProperties(MqttProperties.class) public class MqttConfig { @Resource diff --git a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java index 29f4fff..b7b4234 100644 --- a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java +++ b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java @@ -6,7 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.Map; @Data -@ConfigurationProperties(prefix = "mqtt") +//@ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String host; private String clientId; diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java index 5bf79ed..b1abeb6 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java @@ -23,8 +23,6 @@ import java.io.IOException; @RequestMapping("/wx-message") @Slf4j public class WxMessageController { - - @Resource private WeChatMessageUtil weChatMessageUtil; @Resource @@ -32,8 +30,8 @@ public class WxMessageController { @Resource private VerifyUtil verifyUtil; - @Resource - private CustomerMqttClient client; +// @Resource +// private CustomerMqttClient client; /** * 预约端对接微信推送消息验证 @@ -164,31 +162,6 @@ public class WxMessageController { @PostMapping("/test") public CommonResult test() { - - client.publish("testtopic/1", "这是一个测试"); - try { - Thread.sleep(1000); - }catch (Exception e){ - e.printStackTrace(); - } - client.publish("/testtopic/2", "这是一个测试"); - try { - Thread.sleep(1000); - }catch (Exception e){ - e.printStackTrace(); - } - client.subscribe("/testtopic/#", 1); - try { - Thread.sleep(1000); - }catch (Exception e){ - e.printStackTrace(); - } - client.unsubscribe("/testtopic/#"); - try { - Thread.sleep(1000); - }catch (Exception e){ - e.printStackTrace(); - } return CommonResult.success("success"); } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java index 944e171..93bcbdc 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java @@ -34,9 +34,8 @@ public class CustomerMqttClient { options.setKeepAliveInterval(properties.getKeepAlive()); options.setConnectionTimeout(properties.getTimeout()); options.setAutomaticReconnect(true); - + client.setCallback(new MyMqttCallback(client)); if (!client.isConnected()) { - client.setCallback(new MyMqttCallback(client)); client.connect(options); log.info("连接MQTT服务成功 success"); } else { @@ -51,11 +50,11 @@ public class CustomerMqttClient { client.disconnect(); } - public void publish(String msg, String topic) { - publish(msg, topic, properties.getQos(), false); + public void publish(String topic, String msg) { + publish(topic, msg, properties.getQos(), false); } - public void publish(String msg, String topic, int qos, boolean retained) { + public void publish(String topic, String msg, int qos, boolean retained) { MqttMessage message = new MqttMessage(); message.setPayload(msg.getBytes()); message.setQos(qos); @@ -64,6 +63,7 @@ public class CustomerMqttClient { if (null == mqttTopic) { log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos); } + boolean connected = client.isConnected(); try { client.publish(topic, message); diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java index 0fd826c..e4963e8 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java @@ -1,11 +1,9 @@ package cc.yunxi.utils; -import cc.yunxi.domain.po.Client; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; -import java.time.LocalDateTime; -import java.util.concurrent.atomic.AtomicLong; +import java.nio.charset.StandardCharsets; @Slf4j public class MyMqttCallback implements MqttCallbackExtended { @@ -20,9 +18,11 @@ public class MyMqttCallback implements MqttCallbackExtended { @Override public void connectionLost(Throwable throwable) { log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); + throwable.printStackTrace(); try { mqttClient.reconnect(); } catch (MqttException e) { + e.printStackTrace(); throw new RuntimeException(e); } // AtomicLong reconnectTimes = new AtomicLong(1); @@ -51,7 +51,9 @@ public class MyMqttCallback implements MqttCallbackExtended { // 接受成功 @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - log.info("接收到MQTT消息,主题:{},消息体:{}", topic, mqttMessage.getPayload().toString()); + byte[] payload = mqttMessage.getPayload(); + String message = new String(payload, StandardCharsets.UTF_8); + log.info("接收到MQTT消息,主题:{},消息体:{}", topic, message); // topicHandler.handleTopic(topic, mqttMessage); } @@ -61,7 +63,12 @@ public class MyMqttCallback implements MqttCallbackExtended { log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete()); log.info("消息发送完成,消息ID:{}", iMqttDeliveryToken.getMessageId()); try { - log.info("消息发送完成,消息内容:{}", iMqttDeliveryToken.getMessage().getPayload().toString()); + MqttMessage message = iMqttDeliveryToken.getMessage(); + if(null != message){ + byte[] payload = message.getPayload(); + String content = new String(payload, StandardCharsets.UTF_8); + log.info("消息发送完成,返回消息内容:{}", content); + } } catch (MqttException e) { log.error("获取消息内容失败,原因:{}", e.getMessage()); e.printStackTrace();