diff --git a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java index 349717b..29f4fff 100644 --- a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java +++ b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java @@ -10,12 +10,11 @@ import java.util.Map; public class MqttProperties { private String host; private String clientId; - private String username; - private String password; + private String userName; + private String passWord; private Map topics; private Boolean cleanSession; private Integer timeout; private Integer qos; private Integer keepAlive; - } diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java index 6ec49fd..53da3a3 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java @@ -117,7 +117,7 @@ public class RecycleOrderController { message.setMessageType(MessageTypeEnum.BOOKING);//消息类型 message.setAppType(AppTypeEnum.recycler);//发送到哪里 - message.setReceiveUserId("");//接收人 +// message.setReceiveUserId("");//接收人 message.setOrderNo(orderId);//订单号 message.setOrderStatus(OrderStatusEnum.PENDING); diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java index 2fb0a76..5bf79ed 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java @@ -32,8 +32,8 @@ public class WxMessageController { @Resource private VerifyUtil verifyUtil; -// @Resource - private MqttTool mqttTool; + @Resource + private CustomerMqttClient client; /** * 预约端对接微信推送消息验证 @@ -164,12 +164,31 @@ public class WxMessageController { @PostMapping("/test") public CommonResult test() { - MqttMessage msg = new MqttMessage(); - msg.setQos(1); - msg.setPayload("test".getBytes()); - mqttTool.publish("/test", msg); - System.out.println(mqttTool.isConnected()); - mqttTool.subscribe("/testtopic/#", 1); + + client.publish("testtopic/1", "这是一个测试"); + try { + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } + client.publish("/testtopic/2", "这是一个测试"); + try { + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } + client.subscribe("/testtopic/#", 1); + try { + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } + client.unsubscribe("/testtopic/#"); + try { + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } return CommonResult.success("success"); } diff --git a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java index 07d7eea..8888d48 100644 --- a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java +++ b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java @@ -38,10 +38,10 @@ public class WsServiceImpl implements IWsService { } if (userSession != null && userSession.isOpen()) { userSession.sendMessage(new TextMessage(text)); + log.info("发送成功,接收人:{},消息:{}", userId, text); } else { // 用户不在线,添加到缓存队列 - log.info("用户不在线,添加到缓存队列,上线后重发!"); - log.info("接收人:{},消息:{}", userId, text); + log.info("用户不在线,添加到缓存队列,上线后重发 接收人:{},消息:{}", userId, text); sessionManager.addCacheMessage(message); } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java index 640e492..944e171 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java @@ -10,66 +10,66 @@ import java.util.Map; @Slf4j public class CustomerMqttClient { - private static MqttAsyncClient client; + private static MqttClient client; private MqttProperties properties; - public static MqttAsyncClient getClient() { + public static MqttClient getClient() { return client; } - public static void setClient(MqttAsyncClient client) { + public static void setClient(MqttClient client) { CustomerMqttClient.client = client; } public void connect() throws MqttException { - client = new MqttAsyncClient(properties.getHost(), properties.getClientId(), new MemoryPersistence()); + client = new MqttClient(properties.getHost(), properties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); - if (StrUtil.isNotEmpty(properties.getUsername())) { - options.setUserName(properties.getUsername()); + if (StrUtil.isNotEmpty(properties.getUserName())) { + options.setUserName(properties.getUserName()); } - if (StrUtil.isNotEmpty(properties.getPassword())) { - options.setPassword(properties.getPassword().toCharArray()); + if (StrUtil.isNotEmpty(properties.getPassWord())) { + options.setPassword(properties.getPassWord().toCharArray()); } options.setCleanSession(properties.getCleanSession()); options.setKeepAliveInterval(properties.getKeepAlive()); options.setConnectionTimeout(properties.getTimeout()); + options.setAutomaticReconnect(true); if (!client.isConnected()) { + client.setCallback(new MyMqttCallback(client)); client.connect(options); - client.setCallback(new MyMqttCallback(this)); log.info("连接MQTT服务成功 success"); } else { client.disconnect(); client.connect(options); log.info("重新连接MQTT服务成功 success"); } + initSubscribe(); } public void disconnect() throws MqttException { client.disconnect(); } - public void publish(String pushMessage, String topic) { - publish(pushMessage, topic, properties.getQos(), false); + public void publish(String msg, String topic) { + publish(msg, topic, properties.getQos(), false); } - public void publish(String pushMessage, String topic, int qos, boolean retained) { + public void publish(String msg, String topic, int qos, boolean retained) { MqttMessage message = new MqttMessage(); - message.setPayload(pushMessage.getBytes()); + message.setPayload(msg.getBytes()); message.setQos(qos); message.setRetained(retained); - MqttAsyncClient asyncClient = CustomerMqttClient.getClient(); -// MqttTopic mqttTopic = asyncClient..getTopic(topic); -// if (null == mqttTopic) { -// log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos); -// } + MqttTopic mqttTopic = client.getTopic(topic); + if (null == mqttTopic) { + log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos); + } try { - IMqttDeliveryToken deliveryToken = asyncClient.publish(topic, message); - MqttMessage reMessage = deliveryToken.getMessage(); - log.info("发布成功 topic: {} , qos: {} , message: {},返回消息:{}", topic, qos, pushMessage,reMessage.getPayload()); + client.publish(topic, message); + log.info("发布成功 topic: {} , qos: {} , message: {}", topic, qos, msg); } catch (MqttException e) { - log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, pushMessage); + log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg); e.printStackTrace(); throw new RuntimeException(e); } @@ -88,17 +88,17 @@ public class CustomerMqttClient { // } } - public void subscribe(String topic, int qos) { + public void subscribe(String topicFilter, int qos) { if (client != null && client.isConnected()) { try { - client.subscribe(topic, qos); - log.info("订阅主题成功,topic: {} , qos: {} ", topic, qos); + client.subscribe(topicFilter, qos); + log.info("订阅主题成功,topic: {} , qos: {} ", topicFilter, qos); } catch (MqttException e) { - log.error("订阅主题失败 topic:{}, qos: {} ", topic, qos); + log.error("订阅主题失败 topic:{}, qos: {} ", topicFilter, qos); e.printStackTrace(); } } else { - log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topic, qos); + log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topicFilter, qos); } } @@ -122,16 +122,11 @@ public class CustomerMqttClient { for (Map.Entry entry : topics.entrySet()) { String topic = entry.getKey(); int qos = entry.getValue(); - try { - client.subscribe(topic, qos); - log.info("初始化订阅主题成功,topic: {} , qos: {} ", topic, qos); - } catch (MqttException e) { - log.error("初始化订阅主题失败 topic:{}, qos: {} ", topic, qos); - e.printStackTrace(); - } + log.info("初始化订阅主题 topic: {} , qos: {} ", topic, qos); + subscribe(topic, qos); } } else { - log.error("client is not connected,fail to init subscribe topics!!!"); + log.error("客户端未连接,初始化订阅失败!!!"); } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/HeartbeatTask.java b/nxhs-service/src/main/java/cc/yunxi/utils/HeartbeatTask.java new file mode 100644 index 0000000..537dbcd --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/utils/HeartbeatTask.java @@ -0,0 +1,12 @@ +package cc.yunxi.utils; + +import org.eclipse.paho.client.mqttv3.MqttClient; + +public class HeartbeatTask implements Runnable { + private MqttClient mqttClient; + + @Override + public void run() { +// mqttClient.publish("heartbeat", "alive".getBytes()); + } +} diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java index da018d8..0fd826c 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java @@ -10,9 +10,9 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j public class MyMqttCallback implements MqttCallbackExtended { - private CustomerMqttClient mqttClient; + private MqttClient mqttClient; - public MyMqttCallback(CustomerMqttClient mqttClient) { + public MyMqttCallback(MqttClient mqttClient) { this.mqttClient = mqttClient; } @@ -20,27 +20,32 @@ public class MyMqttCallback implements MqttCallbackExtended { @Override public void connectionLost(Throwable throwable) { log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); - AtomicLong reconnectTimes = new AtomicLong(1); - while (true) { - try { - if (mqttClient.getClient().isConnected()) { - //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 - log.warn("mqtt 重新连接成功,重新开始初始化订阅主题..."); - return; - } - reconnectTimes.incrementAndGet(); - log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now()); - mqttClient.getClient().reconnect(); - } catch (MqttException e) { - log.error("mqtt断连异常", e.getMessage()); - e.printStackTrace(); - } - try { - this.wait(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + mqttClient.reconnect(); + } catch (MqttException e) { + throw new RuntimeException(e); } +// AtomicLong reconnectTimes = new AtomicLong(1); +// while (true) { +// try { +// if (mqttClient.getClient().isConnected()) { +// //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 +// log.warn("mqtt 重新连接成功,重新开始初始化订阅主题..."); +// return; +// } +// reconnectTimes.incrementAndGet(); +// log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now()); +// mqttClient.connect(); +// } catch (MqttException e) { +// log.error("mqtt断连异常", e.getMessage()); +// e.printStackTrace(); +// } +// try { +// this.wait(5000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } } // 接受成功 @@ -68,6 +73,5 @@ public class MyMqttCallback implements MqttCallbackExtended { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连"); - mqttClient.initSubscribe(); } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/WsSessionManager.java b/nxhs-service/src/main/java/cc/yunxi/utils/WsSessionManager.java index a7a3f75..64a7407 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/WsSessionManager.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/WsSessionManager.java @@ -39,28 +39,46 @@ public class WsSessionManager { } public void addClientSession(String userId, WebSocketSession session) { - //获取旧session - WebSocketSession oldSession = CLIENT_SESSION.get(userId); - if (null == oldSession //没有 - || !oldSession.getId().equals(session.getId()) //id不同 - || !oldSession.isOpen()) { //旧session已关闭 + + WebSocketSession old = CLIENT_SESSION.remove(userId); + if (null != old && old.isOpen()) { + try { + old.close(); + log.info("关闭散户端旧连接:{},{}", userId, session.isOpen()); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (session.isOpen()) { CLIENT_SESSION.put(userId, session); //则将新的session添加到池中 - log.info("添加散户端连接:{},{}", userId, session.isOpen()); + log.info("添加散户端新连接:{},{}", userId, session.isOpen()); //判断缓存中是否有未发送的消息 - sendCacheMessage(userId); + sendCacheMessage(userId, AppTypeEnum.client); + } else { + log.error("散户端新连接添加失败,连接已关闭:{},{}", userId, session.isOpen()); } + } public void addRecyclerSession(String userId, WebSocketSession session) { //获取旧session - WebSocketSession oldSession = RECYCLER_SESSION.get(userId); - if (null == oldSession //没有 - || !oldSession.getId().equals(session.getId()) //id不同 - || !oldSession.isOpen()) { //旧session已关闭 + WebSocketSession old = RECYCLER_SESSION.remove(userId); + if (null != old && old.isOpen()) { + try { + old.close(); + log.info("关闭回收员端旧连接:{},{}", userId, session.isOpen()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + if (session.isOpen()) { RECYCLER_SESSION.put(userId, session); //则将新的session添加到池中 log.info("添加回收员端连接:{},{}", userId, session.isOpen()); //判断缓存中是否有未发送的消息 - sendCacheMessage(userId); + sendCacheMessage(userId, AppTypeEnum.recycler); + } else { + log.error("回收员端新连接添加失败,连接已关闭:{},{}", userId, session.isOpen()); } } @@ -78,14 +96,14 @@ public class WsSessionManager { } //处理未发送的消息 - private void sendCacheMessage(String userId) { + private void sendCacheMessage(String userId, AppTypeEnum appType) { // 遍历缓存队列 Iterator iterator = MESSAGE_CACHE.iterator(); - if (iterator.hasNext()) { + while (iterator.hasNext()) { SocketMessage message = iterator.next(); LocalDateTime sendTime = message.getSendTime(); int minute = message.getCacheMinute(); - AppTypeEnum appType = message.getAppType(); + String receiveUserId = message.getReceiveUserId(); WebSocketSession session = null; switch (appType) { case client: @@ -93,24 +111,30 @@ public class WsSessionManager { break; case recycler: session = RECYCLER_SESSION.get(userId); + break; default: + log.error("appType错误,消息被移除:{},{},{}", userId, appType, JSONUtil.toJsonStr(message)); iterator.remove();//移除 break; } - if (userId.equals(message.getReceiveUserId()) + + if (userId.equals(receiveUserId) + && appType.equals(message.getAppType()) && session != null && session.isOpen() && sendTime.plusMinutes(minute).isAfter(LocalDateTime.now())) { // 发送消息 try { session.sendMessage(new TextMessage(JSONUtil.toJsonStr(message))); + log.error("消息已发送,消息被移除:{},{},{}", userId, appType, JSONUtil.toJsonStr(message)); + iterator.remove();//移除 } catch (IOException e) { e.printStackTrace(); } - iterator.remove();//移除 } //过期的消息直接移除 if (sendTime.plusMinutes(minute).isBefore(LocalDateTime.now())) { + log.error("消息已过期,消息被移除:{},{},{}", userId, appType, JSONUtil.toJsonStr(message)); iterator.remove(); } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java index 6d92efa..7fa5158 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java @@ -32,10 +32,10 @@ public class WsSocketHandler extends AbstractWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - log.info("发送文本消息"); +// log.info("发送文本消息"); // 获得客户端传来的消息 String payload = message.getPayload(); - log.info("server 接收到消息 " + payload); + log.info("server 接收到注册消息 " + payload); JSONObject json = JSONUtil.parseObj(payload); String userId = json.getStr("userId"); String appType = json.getStr("appType");