From 615a12af00beb0e7d966d70d3cadd6eb32db1ad6 Mon Sep 17 00:00:00 2001 From: guochaojie Date: Wed, 26 Jun 2024 21:12:40 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/cc/yunxi/config/MqttConfig.java | 12 ++--- .../cc/yunxi/controller/DeviceController.java | 15 ++---- .../yunxi/controller/WxMessageController.java | 2 +- .../yunxi/service/IRecycleDeviceService.java | 2 + .../impl/RecycleDeviceServiceImpl.java | 10 ++++ .../cc/yunxi/utils/CustomerMqttClient.java | 50 +++++++++---------- .../java/cc/yunxi/utils/MyMqttCallback.java | 29 +++++++++-- .../java/cc/yunxi/utils/TopicHandler.java | 38 ++++++++++++-- .../src/main/resources/application-dev.yml | 3 +- 9 files changed, 108 insertions(+), 53 deletions(-) diff --git a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java index 5fef676..29538fc 100644 --- a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java +++ b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java @@ -7,23 +7,21 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - import javax.annotation.Resource; -import java.io.UnsupportedEncodingException; @Slf4j -@Configuration -@EnableConfigurationProperties(MqttProperties.class) +//@Configuration +//@EnableConfigurationProperties(MqttProperties.class) public class MqttConfig { - @Resource +// @Resource private MqttProperties properties; - @Bean +// @Bean public CustomerMqttClient customerMqttClient() throws MqttException { CustomerMqttClient client = new CustomerMqttClient(); - client.setMqttProperties(properties); +// client.setMqttProperties(properties); client.connect(); log.info("===========> MQTT连接成功 <==========="); return client; diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/DeviceController.java b/nxhs-service/src/main/java/cc/yunxi/controller/DeviceController.java index ad87487..fa2262a 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/DeviceController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/DeviceController.java @@ -299,17 +299,12 @@ public class DeviceController { @PostMapping("/login") @ApiOperation("清运员手机号登录") - public CommonResult login(@RequestBody LoginReqVO loginReqVO) { + public CommonResult login(@RequestBody LoginReqVO loginReqVO) { RecycleDevice device = deviceService.getByDeviceCode(loginReqVO.getDeviceCode()); if (device == null) { throw new BizIllegalException("登录失败:未查询到对应设备信息!"); } Recycler recycler = recyclerService.getRecyclerByPhoneNumber(loginReqVO.getPhone()); - CommandVO command = new CommandVO(); - command.setDeviceCode(loginReqVO.getDeviceCode()); - command.setCmd(CMDEnum.login); - command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); - command.setRemark("登录成功"); if (null != recycler) { RecycleStaffDevice staffDevice = staffDeviceService.getByDeviceCode(loginReqVO.getDeviceCode(), loginReqVO.getPhone()); if (null == staffDevice) { @@ -333,9 +328,8 @@ public class DeviceController { respVO.setPhone(client.getMobilePhone()); respVO.setTimeExpire(userInfo.getTimeExpire()); respVO.setRole(1); - command.setData(respVO); - mqttClient.publish(loginReqVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(command)); - return CommonResult.success("登录成功!", "success"); + + return CommonResult.success(respVO, "success"); } //登录成功 UserDTO userDTO = commonService.loginDeviceByRecycler(recycler); @@ -349,8 +343,7 @@ public class DeviceController { respVO.setBalance(BigDecimal.ZERO); respVO.setTimeExpire(userDTO.getTimeExpire()); respVO.setRole(2); - mqttClient.publish(loginReqVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(respVO)); - return CommonResult.success("登录成功!", "success"); + return CommonResult.success(respVO, "success"); } diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java index 26ac9b4..e5e21b5 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java @@ -166,7 +166,7 @@ public class WxMessageController { public CommonResult test() { client.subscribe("testtopc/1",1); client.publish("testtopc/1","{\"msg\": \"hello\"}"); - client.unsubscribe("testtopc/1"); +// client.unsubscribe("testtopc/1"); return CommonResult.success("success"); } diff --git a/nxhs-service/src/main/java/cc/yunxi/service/IRecycleDeviceService.java b/nxhs-service/src/main/java/cc/yunxi/service/IRecycleDeviceService.java index d3701f9..a627be9 100644 --- a/nxhs-service/src/main/java/cc/yunxi/service/IRecycleDeviceService.java +++ b/nxhs-service/src/main/java/cc/yunxi/service/IRecycleDeviceService.java @@ -15,5 +15,7 @@ public interface IRecycleDeviceService extends IService { int updateDevice(DeviceStatus status); + boolean offline(String deviceCode); + boolean isChanged(DeviceStatus old, DeviceStatus now); } diff --git a/nxhs-service/src/main/java/cc/yunxi/service/impl/RecycleDeviceServiceImpl.java b/nxhs-service/src/main/java/cc/yunxi/service/impl/RecycleDeviceServiceImpl.java index 75eb52e..78805a8 100644 --- a/nxhs-service/src/main/java/cc/yunxi/service/impl/RecycleDeviceServiceImpl.java +++ b/nxhs-service/src/main/java/cc/yunxi/service/impl/RecycleDeviceServiceImpl.java @@ -42,6 +42,16 @@ public class RecycleDeviceServiceImpl extends ServiceImpl wrapper = new UpdateWrapper<>(); + wrapper.lambda() + .set(RecycleDevice::getStatus, 0) + .eq(RecycleDevice::getDeviceCode, deviceCode); + return update(wrapper); + + } + @Override public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) { if (!oldStatus.getIp().equals(newStatus.getIp())) return true; diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java index 1a29998..b272253 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java @@ -10,21 +10,31 @@ import org.eclipse.paho.mqttv5.client.MqttTopic; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; - +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Map; @Slf4j +@Component +@EnableConfigurationProperties(MqttProperties.class) public class CustomerMqttClient { - private static MqttClient client; + @Resource + private MyMqttCallback callback; + private MqttClient client; + @Resource private MqttProperties properties; - public static MqttClient getClient() { - return client; - } - - public static void setClient(MqttClient client) { - CustomerMqttClient.client = client; + @PostConstruct + public void init() { + try { + connect(); + } catch (MqttException e) { + log.error("连接MQTT服务失败", e); + throw new BizIllegalException("连接MQTT服务失败"); + } } public void connect() throws MqttException { @@ -40,8 +50,9 @@ public class CustomerMqttClient { options.setKeepAliveInterval(properties.getKeepAlive()); options.setConnectionTimeout(properties.getTimeout()); options.setAutomaticReconnect(true); + options.setAutomaticReconnectDelay(1, 3); options.setSessionExpiryInterval(0L);//会话过期时间 - client.setCallback(new MyMqttCallback(client)); + client.setCallback(callback); if (!client.isConnected()) { client.connect(options); log.info("连接MQTT服务成功 success"); @@ -50,6 +61,7 @@ public class CustomerMqttClient { client.connect(options); log.info("重新连接MQTT服务成功 success"); } + callback.registerClient(client); initSubscribe(); } @@ -83,19 +95,6 @@ public class CustomerMqttClient { e.printStackTrace(); throw new BizIllegalException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg); } -// MqttDeliveryToken token;//Delivery:配送 -// synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 -// try { -// token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 -// token.waitForCompletion(1000L); -// } catch (MqttPersistenceException e) { -// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos); -// e.printStackTrace(); -// } catch (MqttException e) { -// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos); -// e.printStackTrace(); -// } -// } } public void subscribe(String topicFilter, int qos) { @@ -129,6 +128,9 @@ public class CustomerMqttClient { public void initSubscribe() { if (client != null && client.isConnected()) { Map topics = properties.getTopics(); + if(topics == null || topics.size() == 0) { + log.info("未配置初始化订阅主题!!!"); + } for (Map.Entry entry : topics.entrySet()) { String topic = entry.getKey(); int qos = entry.getValue(); @@ -139,8 +141,4 @@ public class CustomerMqttClient { log.error("客户端未连接,初始化订阅失败!!!"); } } - - public void setMqttProperties(MqttProperties mqttProperties) { - this.properties = mqttProperties; - } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java index d01d785..f69f2a0 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java @@ -1,34 +1,52 @@ package cc.yunxi.utils; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.nio.charset.StandardCharsets; @Slf4j +@Component public class MyMqttCallback implements MqttCallback { private MqttClient mqttClient; - + @Resource private TopicHandler topicHandler; - public MyMqttCallback(MqttClient mqttClient) { + public void registerClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } - @Override public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString()); + + try { + mqttClient.reconnect(); + } catch (MqttException e) { + log.error("MQTT 重新连接失败,错误信息:{}", e.getMessage()); + throw new RuntimeException(e); + } mqttDisconnectResponse.getException().printStackTrace(); } @Override public void mqttErrorOccurred(MqttException e) { log.error("MQTT 错误,错误信息:{}", e.getMessage()); + try { + mqttClient.reconnect(); + } catch (MqttException ex) { + log.error("MQTT 重新连接失败,错误信息:{}", ex.getMessage()); + throw new RuntimeException(ex); + } e.printStackTrace(); } @@ -57,4 +75,7 @@ public class MyMqttCallback implements MqttCallback { public void authPacketArrived(int i, MqttProperties mqttProperties) { log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString()); } + + + } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java index e8be440..4f0fbf7 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java @@ -1,13 +1,45 @@ package cc.yunxi.utils; +import cc.yunxi.service.IRecycleDeviceService; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; + @Component +@Slf4j public class TopicHandler { + @Resource + private IRecycleDeviceService deviceService; + public void handleTopic(String topic, MqttMessage message) { - // 处理订阅的主题和消息 - System.out.println("Received message on topic: " + topic + ", message: " + message.getPayload()); + String[] split = topic.split("/"); + String deviceCode = split[0]; + String action = split[1]; + byte[] payload = message.getPayload(); + String msg = new String(payload, StandardCharsets.UTF_8); + switch (action) { + case "Dead": + Dead(deviceCode, msg); + break; + default: + break; + } + log.info("消息已处理 :topic:{},msg:{}", topic, msg); + } + + /** + * 处理遗嘱消息 1 离线 0 在线 + * + * @param deviceCode + * @param msg + */ + private void Dead(String deviceCode, String msg) { + if ("1".equals(msg)) { + deviceService.offline(deviceCode); + } } -} +} \ No newline at end of file diff --git a/nxhs-service/src/main/resources/application-dev.yml b/nxhs-service/src/main/resources/application-dev.yml index 7e22117..24497b3 100644 --- a/nxhs-service/src/main/resources/application-dev.yml +++ b/nxhs-service/src/main/resources/application-dev.yml @@ -38,4 +38,5 @@ mqtt: timeout: 30 qos: 1 keepAlive: 30 - topics: {} \ No newline at end of file + topics: + "[+/Dead]": 1 \ No newline at end of file