From f6b5f12eef869cb4e181a505e3c59eab762c19e1 Mon Sep 17 00:00:00 2001 From: guochaojie Date: Wed, 12 Jun 2024 09:13:56 +0800 Subject: [PATCH] =?UTF-8?q?Socket=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF=20mq?= =?UTF-8?q?tt=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nxhs-service/pom.xml | 29 ++++ .../main/java/cc/yunxi/config/MqttConfig.java | 31 ++++ .../cc/yunxi/config/props/MqttProperties.java | 21 +++ .../controller/RecycleOrderController.java | 14 +- .../yunxi/controller/WxMessageController.java | 21 ++- .../yunxi/domain/vo/socket/OrderCancel.java | 4 +- .../yunxi/domain/vo/socket/OrderFinish.java | 4 +- .../cc/yunxi/domain/vo/socket/OrderTaken.java | 2 +- .../cc/yunxi/service/impl/WsServiceImpl.java | 1 + .../cc/yunxi/utils/CustomerMqttClient.java | 141 ++++++++++++++++++ .../main/java/cc/yunxi/utils/MqttTool.java | 57 +++++++ .../java/cc/yunxi/utils/MyMqttCallback.java | 73 +++++++++ .../java/cc/yunxi/utils/TopicHandler.java | 13 ++ .../java/cc/yunxi/utils/WsSocketHandler.java | 6 +- 14 files changed, 400 insertions(+), 17 deletions(-) create mode 100644 nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java create mode 100644 nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java create mode 100644 nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java create mode 100644 nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java create mode 100644 nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java create mode 100644 nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java diff --git a/nxhs-service/pom.xml b/nxhs-service/pom.xml index f45ea95..325f9be 100644 --- a/nxhs-service/pom.xml +++ b/nxhs-service/pom.xml @@ -62,6 +62,35 @@ org.springframework.boot spring-boot-starter-websocket + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-stream + + + org.springframework.integration + spring-integration-mqtt + + + + + + + + + + + + + + + + + diff --git a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java new file mode 100644 index 0000000..01dafb5 --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java @@ -0,0 +1,31 @@ +package cc.yunxi.config; + +import cc.yunxi.config.props.MqttProperties; +import cc.yunxi.utils.CustomerMqttClient; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.Resource; + + +@Slf4j +//@Configuration +@EnableConfigurationProperties(MqttProperties.class) +public class MqttConfig { + + @Resource + private MqttProperties properties; + + @Bean + public CustomerMqttClient customerMqttClient() throws MqttException { + CustomerMqttClient client = new CustomerMqttClient(); + client.setMqttProperties(properties); + client.connect(); + log.info("===========> MQTT连接成功 <==========="); + return client; + } +} + diff --git a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java new file mode 100644 index 0000000..349717b --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java @@ -0,0 +1,21 @@ +package cc.yunxi.config.props; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.Map; + +@Data +@ConfigurationProperties(prefix = "mqtt") +public class MqttProperties { + private String host; + private String clientId; + private String username; + private String password; + private Map topics; + private Boolean cleanSession; + private Integer timeout; + private Integer qos; + private Integer keepAlive; + +} diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java index 05bc80d..3ca6ae0 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java @@ -43,6 +43,7 @@ import org.springframework.web.bind.annotation.*; import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -220,7 +221,7 @@ public class RecycleOrderController { cancel.setOrderNo(order.getOrderNumber()); cancel.setOrderStatus(order.getOrderStatus()); - cancel.setCancelOrderTime(LocalDateTime.now()); + cancel.setCancelOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); cancel.setClientPhone(order.getClientMobile()); cancel.setClientAddress(addressInfo.getReceiveStreet()); @@ -252,9 +253,14 @@ public class RecycleOrderController { taken.setOrderNo(order.getOrderNumber()); taken.setOrderStatus(order.getOrderStatus()); - taken.setTakeOrderUser(order.getStaffsName()); + taken.setTakeOrderUser(recycler.getStaffsName()); taken.setTakeUserPhone(recycler.getMobilePhone()); - taken.setTakeOrderTime(LocalDateTime.now()); + taken.setTakeOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); + try { + wsService.sendMsgToUser(taken); + } catch (IOException e) { + e.printStackTrace(); + } return CommonResult.success(true); } @@ -289,10 +295,12 @@ public class RecycleOrderController { finish.setOrderNo(order.getOrderNumber()); finish.setOrderStatus(order.getOrderStatus()); +// finish.setRealWeight(order.getPredictWeight().toString()); finish.setRealMoney(order.getOrderAmount()); finish.setTakeOrderUser(recycler.getStaffsName()); finish.setTakeUserPhone(recycler.getMobilePhone()); + try { wsService.sendMsgToUser(finish); } catch (IOException e) { diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java index 9609bac..2fb0a76 100644 --- a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java +++ b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java @@ -7,15 +7,12 @@ import cc.yunxi.domain.vo.socket.MessageTypeEnum; import cc.yunxi.domain.vo.socket.SocketMessage; import cc.yunxi.domain.vo.vxmessage.*; import cc.yunxi.service.IWsService; -import cc.yunxi.utils.UserContext; -import cc.yunxi.utils.VerifyUtil; -import cc.yunxi.utils.WeChatMessageUtil; -import cc.yunxi.utils.WeChatUtil; +import cc.yunxi.utils.*; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import io.swagger.annotations.Api; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -35,6 +32,9 @@ public class WxMessageController { @Resource private VerifyUtil verifyUtil; +// @Resource + private MqttTool mqttTool; + /** * 预约端对接微信推送消息验证 * @@ -162,5 +162,16 @@ public class WxMessageController { return CommonResult.success(result); } + @PostMapping("/test") + public CommonResult test() { + MqttMessage msg = new MqttMessage(); + msg.setQos(1); + msg.setPayload("test".getBytes()); + mqttTool.publish("/test", msg); + System.out.println(mqttTool.isConnected()); + mqttTool.subscribe("/testtopic/#", 1); + return CommonResult.success("success"); + } + } diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java index f30394b..da47542 100644 --- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java +++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java @@ -8,14 +8,14 @@ import lombok.Data; import java.time.LocalDateTime; @Data -@ApiModel("新订单") +@ApiModel("订单取消") public class OrderCancel extends SocketMessage{ @ApiModelProperty("订单号") private String orderNo; @ApiModelProperty("订单状态") private OrderStatusEnum orderStatus; @ApiModelProperty("取消时间") - private LocalDateTime cancelOrderTime; + private String cancelOrderTime; @ApiModelProperty("预约人号码") private String clientPhone; @ApiModelProperty("预约地址") diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java index 09ee0ed..f65185b 100644 --- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java +++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java @@ -7,15 +7,13 @@ import lombok.Data; import java.math.BigDecimal; -@ApiModel("新订单") +@ApiModel("订单完成") @Data public class OrderFinish extends SocketMessage{ @ApiModelProperty("订单号") private String orderNo; @ApiModelProperty("订单状态") private OrderStatusEnum orderStatus; - @ApiModelProperty("实际重量") - private String realWeight; @ApiModelProperty("实际金额") private BigDecimal realMoney; @ApiModelProperty("接单人") diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java index 147036b..28c0696 100644 --- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java +++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java @@ -19,5 +19,5 @@ public class OrderTaken extends SocketMessage { @ApiModelProperty("接单人电话") private String takeUserPhone; @ApiModelProperty("接单时间") - private LocalDateTime takeOrderTime; + private String takeOrderTime; } diff --git a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java index 90c7903..ecbc004 100644 --- a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java +++ b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java @@ -41,6 +41,7 @@ public class WsServiceImpl implements IWsService { } else { // 用户不在线,添加到缓存队列 log.info("用户不在线,添加到缓存队列,上线后重发!"); + log.info("接收人:{},消息:{}", userId, text); sessionManager.addCacheMessage(message); } } diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java new file mode 100644 index 0000000..640e492 --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java @@ -0,0 +1,141 @@ +package cc.yunxi.utils; + +import cc.yunxi.config.props.MqttProperties; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.Map; + +@Slf4j +public class CustomerMqttClient { + private static MqttAsyncClient client; + private MqttProperties properties; + + public static MqttAsyncClient getClient() { + return client; + } + + public static void setClient(MqttAsyncClient client) { + CustomerMqttClient.client = client; + } + + public void connect() throws MqttException { + client = new MqttAsyncClient(properties.getHost(), properties.getClientId(), new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + if (StrUtil.isNotEmpty(properties.getUsername())) { + options.setUserName(properties.getUsername()); + } + if (StrUtil.isNotEmpty(properties.getPassword())) { + options.setPassword(properties.getPassword().toCharArray()); + } + options.setCleanSession(properties.getCleanSession()); + options.setKeepAliveInterval(properties.getKeepAlive()); + options.setConnectionTimeout(properties.getTimeout()); + + if (!client.isConnected()) { + client.connect(options); + client.setCallback(new MyMqttCallback(this)); + log.info("连接MQTT服务成功 success"); + } else { + client.disconnect(); + client.connect(options); + log.info("重新连接MQTT服务成功 success"); + } + } + + public void disconnect() throws MqttException { + client.disconnect(); + } + + public void publish(String pushMessage, String topic) { + publish(pushMessage, topic, properties.getQos(), false); + } + + public void publish(String pushMessage, String topic, int qos, boolean retained) { + MqttMessage message = new MqttMessage(); + message.setPayload(pushMessage.getBytes()); + message.setQos(qos); + message.setRetained(retained); + MqttAsyncClient asyncClient = CustomerMqttClient.getClient(); +// MqttTopic mqttTopic = asyncClient..getTopic(topic); +// if (null == mqttTopic) { +// log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos); +// } + + try { + IMqttDeliveryToken deliveryToken = asyncClient.publish(topic, message); + MqttMessage reMessage = deliveryToken.getMessage(); + log.info("发布成功 topic: {} , qos: {} , message: {},返回消息:{}", topic, qos, pushMessage,reMessage.getPayload()); + } catch (MqttException e) { + log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, pushMessage); + e.printStackTrace(); + throw new RuntimeException(e); + } +// MqttDeliveryToken token;//Delivery:配送 +// synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 +// try { +// token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 +// token.waitForCompletion(1000L); +// } catch (MqttPersistenceException e) { +// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos); +// e.printStackTrace(); +// } catch (MqttException e) { +// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos); +// e.printStackTrace(); +// } +// } + } + + public void subscribe(String topic, int qos) { + if (client != null && client.isConnected()) { + try { + client.subscribe(topic, qos); + log.info("订阅主题成功,topic: {} , qos: {} ", topic, qos); + } catch (MqttException e) { + log.error("订阅主题失败 topic:{}, qos: {} ", topic, qos); + e.printStackTrace(); + } + } else { + log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topic, qos); + } + } + + public void unsubscribe(String topic) { + if (client != null && client.isConnected()) { + try { + client.unsubscribe(topic); + log.info("取消订阅成功 topic success:{}", topic); + } catch (MqttException e) { + log.error("取消订阅失败 topic:{}", topic); + e.printStackTrace(); + } + } else { + log.error("客户端连接状态已断开,取消订阅失败 topic:{}", topic); + } + } + + public void initSubscribe() { + if (client != null && client.isConnected()) { + Map topics = properties.getTopics(); + for (Map.Entry entry : topics.entrySet()) { + String topic = entry.getKey(); + int qos = entry.getValue(); + try { + client.subscribe(topic, qos); + log.info("初始化订阅主题成功,topic: {} , qos: {} ", topic, qos); + } catch (MqttException e) { + log.error("初始化订阅主题失败 topic:{}, qos: {} ", topic, qos); + e.printStackTrace(); + } + } + } else { + log.error("client is not connected,fail to init subscribe topics!!!"); + } + } + + public void setMqttProperties(MqttProperties mqttProperties) { + this.properties = mqttProperties; + } +} diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java b/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java new file mode 100644 index 0000000..3945684 --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java @@ -0,0 +1,57 @@ +package cc.yunxi.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +//@Component +@Slf4j +public class MqttTool { + +// @Resource + private MqttClient client; + + public boolean publish(String topic, MqttMessage message) { + try { + client.publish(topic, message); + log.warn("发布成功,topic:{},message:{}", topic, message.getPayload()); + return true; + } catch (MqttException e) { + log.error("发布失败,topic:{},message:{}", topic, message.getPayload(), e); + e.printStackTrace(); + return false; + } + } + + public boolean subscribe(String topic, int qos) { + try { + if (client.isConnected() == false) { + log.warn("链接已断开,topic:{},qos:{}", topic, qos); + } + client.subscribe(topic, qos); + log.warn("订阅成功,topic:{},qos:{}", topic, qos); + return true; + } catch (MqttException e) { + log.warn("订阅主题失败,topic:{},qos:{}", topic, qos); + e.printStackTrace(); + return false; + } + } + + public boolean isConnected() { + return client.isConnected(); + } + + public void setWill() throws MqttException { + // 设置遗嘱 + String willTopic = "willTopic"; + String willMessage = "willMessage"; + int willQos = 2; + boolean willRetain = false; +// (willTopic, willMessage.getBytes(), willQos, willRetain); + } +} diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java new file mode 100644 index 0000000..da018d8 --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java @@ -0,0 +1,73 @@ +package cc.yunxi.utils; + +import cc.yunxi.domain.po.Client; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; + +import java.time.LocalDateTime; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +public class MyMqttCallback implements MqttCallbackExtended { + + private CustomerMqttClient mqttClient; + + public MyMqttCallback(CustomerMqttClient mqttClient) { + this.mqttClient = mqttClient; + } + + //链接断开 + @Override + public void connectionLost(Throwable throwable) { + log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); + AtomicLong reconnectTimes = new AtomicLong(1); + while (true) { + try { + if (mqttClient.getClient().isConnected()) { + //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 + log.warn("mqtt 重新连接成功,重新开始初始化订阅主题..."); + return; + } + reconnectTimes.incrementAndGet(); + log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now()); + mqttClient.getClient().reconnect(); + } catch (MqttException e) { + log.error("mqtt断连异常", e.getMessage()); + e.printStackTrace(); + } + try { + this.wait(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + // 接受成功 + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + log.info("接收到MQTT消息,主题:{},消息体:{}", topic, mqttMessage.getPayload().toString()); +// topicHandler.handleTopic(topic, mqttMessage); + } + + //发送完成 + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete()); + log.info("消息发送完成,消息ID:{}", iMqttDeliveryToken.getMessageId()); + try { + log.info("消息发送完成,消息内容:{}", iMqttDeliveryToken.getMessage().getPayload().toString()); + } catch (MqttException e) { + log.error("获取消息内容失败,原因:{}", e.getMessage()); + e.printStackTrace(); + } + + } + + //连接完成后 + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连"); + mqttClient.initSubscribe(); + } +} diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java new file mode 100644 index 0000000..b0f5a53 --- /dev/null +++ b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java @@ -0,0 +1,13 @@ +package cc.yunxi.utils; + +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; + +@Component +public class TopicHandler { + + public void handleTopic(String topic, MqttMessage message) { + // 处理订阅的主题和消息 + System.out.println("Received message on topic: " + topic + ", message: " + message.getPayload()); + } +} diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java index 038c9ca..6d92efa 100644 --- a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java +++ b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java @@ -41,12 +41,12 @@ public class WsSocketHandler extends AbstractWebSocketHandler { String appType = json.getStr("appType"); if (AppTypeEnum.client.name().equals(appType)) { SESSION_MANAGER.addClientSession(userId, session); - session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); +// session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); } else if (AppTypeEnum.recycler.name().equals(appType)) { SESSION_MANAGER.addRecyclerSession(userId, session); - session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); +// session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); } else { - session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确,连接即将关闭!!! 发送时间:" + LocalDateTime.now().toString())); +// session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确,连接即将关闭!!! 发送时间:" + LocalDateTime.now().toString())); SESSION_MANAGER.removeAndClose(session.getId()); log.error("ws发送参数不正确,连接关闭!!!"); }