diff --git a/nxhs-service/pom.xml b/nxhs-service/pom.xml
index f45ea95..325f9be 100644
--- a/nxhs-service/pom.xml
+++ b/nxhs-service/pom.xml
@@ -62,6 +62,35 @@
org.springframework.boot
spring-boot-starter-websocket
+
+
+ org.springframework.boot
+ spring-boot-starter-integration
+
+
+ org.springframework.integration
+ spring-integration-stream
+
+
+ org.springframework.integration
+ spring-integration-mqtt
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java
new file mode 100644
index 0000000..01dafb5
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/config/MqttConfig.java
@@ -0,0 +1,31 @@
+package cc.yunxi.config;
+
+import cc.yunxi.config.props.MqttProperties;
+import cc.yunxi.utils.CustomerMqttClient;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.Resource;
+
+
+@Slf4j
+//@Configuration
+@EnableConfigurationProperties(MqttProperties.class)
+public class MqttConfig {
+
+ @Resource
+ private MqttProperties properties;
+
+ @Bean
+ public CustomerMqttClient customerMqttClient() throws MqttException {
+ CustomerMqttClient client = new CustomerMqttClient();
+ client.setMqttProperties(properties);
+ client.connect();
+ log.info("===========> MQTT连接成功 <===========");
+ return client;
+ }
+}
+
diff --git a/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java
new file mode 100644
index 0000000..349717b
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/config/props/MqttProperties.java
@@ -0,0 +1,21 @@
+package cc.yunxi.config.props;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import java.util.Map;
+
+@Data
+@ConfigurationProperties(prefix = "mqtt")
+public class MqttProperties {
+ private String host;
+ private String clientId;
+ private String username;
+ private String password;
+ private Map topics;
+ private Boolean cleanSession;
+ private Integer timeout;
+ private Integer qos;
+ private Integer keepAlive;
+
+}
diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java
index 05bc80d..3ca6ae0 100644
--- a/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java
+++ b/nxhs-service/src/main/java/cc/yunxi/controller/RecycleOrderController.java
@@ -43,6 +43,7 @@ import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@@ -220,7 +221,7 @@ public class RecycleOrderController {
cancel.setOrderNo(order.getOrderNumber());
cancel.setOrderStatus(order.getOrderStatus());
- cancel.setCancelOrderTime(LocalDateTime.now());
+ cancel.setCancelOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
cancel.setClientPhone(order.getClientMobile());
cancel.setClientAddress(addressInfo.getReceiveStreet());
@@ -252,9 +253,14 @@ public class RecycleOrderController {
taken.setOrderNo(order.getOrderNumber());
taken.setOrderStatus(order.getOrderStatus());
- taken.setTakeOrderUser(order.getStaffsName());
+ taken.setTakeOrderUser(recycler.getStaffsName());
taken.setTakeUserPhone(recycler.getMobilePhone());
- taken.setTakeOrderTime(LocalDateTime.now());
+ taken.setTakeOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+ try {
+ wsService.sendMsgToUser(taken);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
return CommonResult.success(true);
}
@@ -289,10 +295,12 @@ public class RecycleOrderController {
finish.setOrderNo(order.getOrderNumber());
finish.setOrderStatus(order.getOrderStatus());
+// finish.setRealWeight(order.getPredictWeight().toString());
finish.setRealMoney(order.getOrderAmount());
finish.setTakeOrderUser(recycler.getStaffsName());
finish.setTakeUserPhone(recycler.getMobilePhone());
+
try {
wsService.sendMsgToUser(finish);
} catch (IOException e) {
diff --git a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java
index 9609bac..2fb0a76 100644
--- a/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java
+++ b/nxhs-service/src/main/java/cc/yunxi/controller/WxMessageController.java
@@ -7,15 +7,12 @@ import cc.yunxi.domain.vo.socket.MessageTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage;
import cc.yunxi.domain.vo.vxmessage.*;
import cc.yunxi.service.IWsService;
-import cc.yunxi.utils.UserContext;
-import cc.yunxi.utils.VerifyUtil;
-import cc.yunxi.utils.WeChatMessageUtil;
-import cc.yunxi.utils.WeChatUtil;
+import cc.yunxi.utils.*;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@@ -35,6 +32,9 @@ public class WxMessageController {
@Resource
private VerifyUtil verifyUtil;
+// @Resource
+ private MqttTool mqttTool;
+
/**
* 预约端对接微信推送消息验证
*
@@ -162,5 +162,16 @@ public class WxMessageController {
return CommonResult.success(result);
}
+ @PostMapping("/test")
+ public CommonResult test() {
+ MqttMessage msg = new MqttMessage();
+ msg.setQos(1);
+ msg.setPayload("test".getBytes());
+ mqttTool.publish("/test", msg);
+ System.out.println(mqttTool.isConnected());
+ mqttTool.subscribe("/testtopic/#", 1);
+ return CommonResult.success("success");
+ }
+
}
diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java
index f30394b..da47542 100644
--- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java
+++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderCancel.java
@@ -8,14 +8,14 @@ import lombok.Data;
import java.time.LocalDateTime;
@Data
-@ApiModel("新订单")
+@ApiModel("订单取消")
public class OrderCancel extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
@ApiModelProperty("取消时间")
- private LocalDateTime cancelOrderTime;
+ private String cancelOrderTime;
@ApiModelProperty("预约人号码")
private String clientPhone;
@ApiModelProperty("预约地址")
diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java
index 09ee0ed..f65185b 100644
--- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java
+++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderFinish.java
@@ -7,15 +7,13 @@ import lombok.Data;
import java.math.BigDecimal;
-@ApiModel("新订单")
+@ApiModel("订单完成")
@Data
public class OrderFinish extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
- @ApiModelProperty("实际重量")
- private String realWeight;
@ApiModelProperty("实际金额")
private BigDecimal realMoney;
@ApiModelProperty("接单人")
diff --git a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java
index 147036b..28c0696 100644
--- a/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java
+++ b/nxhs-service/src/main/java/cc/yunxi/domain/vo/socket/OrderTaken.java
@@ -19,5 +19,5 @@ public class OrderTaken extends SocketMessage {
@ApiModelProperty("接单人电话")
private String takeUserPhone;
@ApiModelProperty("接单时间")
- private LocalDateTime takeOrderTime;
+ private String takeOrderTime;
}
diff --git a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java
index 90c7903..ecbc004 100644
--- a/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java
+++ b/nxhs-service/src/main/java/cc/yunxi/service/impl/WsServiceImpl.java
@@ -41,6 +41,7 @@ public class WsServiceImpl implements IWsService {
} else {
// 用户不在线,添加到缓存队列
log.info("用户不在线,添加到缓存队列,上线后重发!");
+ log.info("接收人:{},消息:{}", userId, text);
sessionManager.addCacheMessage(message);
}
}
diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java
new file mode 100644
index 0000000..640e492
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/utils/CustomerMqttClient.java
@@ -0,0 +1,141 @@
+package cc.yunxi.utils;
+
+import cc.yunxi.config.props.MqttProperties;
+import cn.hutool.core.util.StrUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.Map;
+
+@Slf4j
+public class CustomerMqttClient {
+ private static MqttAsyncClient client;
+ private MqttProperties properties;
+
+ public static MqttAsyncClient getClient() {
+ return client;
+ }
+
+ public static void setClient(MqttAsyncClient client) {
+ CustomerMqttClient.client = client;
+ }
+
+ public void connect() throws MqttException {
+ client = new MqttAsyncClient(properties.getHost(), properties.getClientId(), new MemoryPersistence());
+ MqttConnectOptions options = new MqttConnectOptions();
+ if (StrUtil.isNotEmpty(properties.getUsername())) {
+ options.setUserName(properties.getUsername());
+ }
+ if (StrUtil.isNotEmpty(properties.getPassword())) {
+ options.setPassword(properties.getPassword().toCharArray());
+ }
+ options.setCleanSession(properties.getCleanSession());
+ options.setKeepAliveInterval(properties.getKeepAlive());
+ options.setConnectionTimeout(properties.getTimeout());
+
+ if (!client.isConnected()) {
+ client.connect(options);
+ client.setCallback(new MyMqttCallback(this));
+ log.info("连接MQTT服务成功 success");
+ } else {
+ client.disconnect();
+ client.connect(options);
+ log.info("重新连接MQTT服务成功 success");
+ }
+ }
+
+ public void disconnect() throws MqttException {
+ client.disconnect();
+ }
+
+ public void publish(String pushMessage, String topic) {
+ publish(pushMessage, topic, properties.getQos(), false);
+ }
+
+ public void publish(String pushMessage, String topic, int qos, boolean retained) {
+ MqttMessage message = new MqttMessage();
+ message.setPayload(pushMessage.getBytes());
+ message.setQos(qos);
+ message.setRetained(retained);
+ MqttAsyncClient asyncClient = CustomerMqttClient.getClient();
+// MqttTopic mqttTopic = asyncClient..getTopic(topic);
+// if (null == mqttTopic) {
+// log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos);
+// }
+
+ try {
+ IMqttDeliveryToken deliveryToken = asyncClient.publish(topic, message);
+ MqttMessage reMessage = deliveryToken.getMessage();
+ log.info("发布成功 topic: {} , qos: {} , message: {},返回消息:{}", topic, qos, pushMessage,reMessage.getPayload());
+ } catch (MqttException e) {
+ log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, pushMessage);
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+// MqttDeliveryToken token;//Delivery:配送
+// synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
+// try {
+// token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
+// token.waitForCompletion(1000L);
+// } catch (MqttPersistenceException e) {
+// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
+// e.printStackTrace();
+// } catch (MqttException e) {
+// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
+// e.printStackTrace();
+// }
+// }
+ }
+
+ public void subscribe(String topic, int qos) {
+ if (client != null && client.isConnected()) {
+ try {
+ client.subscribe(topic, qos);
+ log.info("订阅主题成功,topic: {} , qos: {} ", topic, qos);
+ } catch (MqttException e) {
+ log.error("订阅主题失败 topic:{}, qos: {} ", topic, qos);
+ e.printStackTrace();
+ }
+ } else {
+ log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topic, qos);
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ if (client != null && client.isConnected()) {
+ try {
+ client.unsubscribe(topic);
+ log.info("取消订阅成功 topic success:{}", topic);
+ } catch (MqttException e) {
+ log.error("取消订阅失败 topic:{}", topic);
+ e.printStackTrace();
+ }
+ } else {
+ log.error("客户端连接状态已断开,取消订阅失败 topic:{}", topic);
+ }
+ }
+
+ public void initSubscribe() {
+ if (client != null && client.isConnected()) {
+ Map topics = properties.getTopics();
+ for (Map.Entry entry : topics.entrySet()) {
+ String topic = entry.getKey();
+ int qos = entry.getValue();
+ try {
+ client.subscribe(topic, qos);
+ log.info("初始化订阅主题成功,topic: {} , qos: {} ", topic, qos);
+ } catch (MqttException e) {
+ log.error("初始化订阅主题失败 topic:{}, qos: {} ", topic, qos);
+ e.printStackTrace();
+ }
+ }
+ } else {
+ log.error("client is not connected,fail to init subscribe topics!!!");
+ }
+ }
+
+ public void setMqttProperties(MqttProperties mqttProperties) {
+ this.properties = mqttProperties;
+ }
+}
diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java b/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java
new file mode 100644
index 0000000..3945684
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/utils/MqttTool.java
@@ -0,0 +1,57 @@
+package cc.yunxi.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+//@Component
+@Slf4j
+public class MqttTool {
+
+// @Resource
+ private MqttClient client;
+
+ public boolean publish(String topic, MqttMessage message) {
+ try {
+ client.publish(topic, message);
+ log.warn("发布成功,topic:{},message:{}", topic, message.getPayload());
+ return true;
+ } catch (MqttException e) {
+ log.error("发布失败,topic:{},message:{}", topic, message.getPayload(), e);
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean subscribe(String topic, int qos) {
+ try {
+ if (client.isConnected() == false) {
+ log.warn("链接已断开,topic:{},qos:{}", topic, qos);
+ }
+ client.subscribe(topic, qos);
+ log.warn("订阅成功,topic:{},qos:{}", topic, qos);
+ return true;
+ } catch (MqttException e) {
+ log.warn("订阅主题失败,topic:{},qos:{}", topic, qos);
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean isConnected() {
+ return client.isConnected();
+ }
+
+ public void setWill() throws MqttException {
+ // 设置遗嘱
+ String willTopic = "willTopic";
+ String willMessage = "willMessage";
+ int willQos = 2;
+ boolean willRetain = false;
+// (willTopic, willMessage.getBytes(), willQos, willRetain);
+ }
+}
diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java
new file mode 100644
index 0000000..da018d8
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/utils/MyMqttCallback.java
@@ -0,0 +1,73 @@
+package cc.yunxi.utils;
+
+import cc.yunxi.domain.po.Client;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+
+import java.time.LocalDateTime;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class MyMqttCallback implements MqttCallbackExtended {
+
+ private CustomerMqttClient mqttClient;
+
+ public MyMqttCallback(CustomerMqttClient mqttClient) {
+ this.mqttClient = mqttClient;
+ }
+
+ //链接断开
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
+ AtomicLong reconnectTimes = new AtomicLong(1);
+ while (true) {
+ try {
+ if (mqttClient.getClient().isConnected()) {
+ //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
+ log.warn("mqtt 重新连接成功,重新开始初始化订阅主题...");
+ return;
+ }
+ reconnectTimes.incrementAndGet();
+ log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now());
+ mqttClient.getClient().reconnect();
+ } catch (MqttException e) {
+ log.error("mqtt断连异常", e.getMessage());
+ e.printStackTrace();
+ }
+ try {
+ this.wait(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // 接受成功
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ log.info("接收到MQTT消息,主题:{},消息体:{}", topic, mqttMessage.getPayload().toString());
+// topicHandler.handleTopic(topic, mqttMessage);
+ }
+
+ //发送完成
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
+ log.info("消息发送完成,消息ID:{}", iMqttDeliveryToken.getMessageId());
+ try {
+ log.info("消息发送完成,消息内容:{}", iMqttDeliveryToken.getMessage().getPayload().toString());
+ } catch (MqttException e) {
+ log.error("获取消息内容失败,原因:{}", e.getMessage());
+ e.printStackTrace();
+ }
+
+ }
+
+ //连接完成后
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连");
+ mqttClient.initSubscribe();
+ }
+}
diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java
new file mode 100644
index 0000000..b0f5a53
--- /dev/null
+++ b/nxhs-service/src/main/java/cc/yunxi/utils/TopicHandler.java
@@ -0,0 +1,13 @@
+package cc.yunxi.utils;
+
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TopicHandler {
+
+ public void handleTopic(String topic, MqttMessage message) {
+ // 处理订阅的主题和消息
+ System.out.println("Received message on topic: " + topic + ", message: " + message.getPayload());
+ }
+}
diff --git a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java
index 038c9ca..6d92efa 100644
--- a/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java
+++ b/nxhs-service/src/main/java/cc/yunxi/utils/WsSocketHandler.java
@@ -41,12 +41,12 @@ public class WsSocketHandler extends AbstractWebSocketHandler {
String appType = json.getStr("appType");
if (AppTypeEnum.client.name().equals(appType)) {
SESSION_MANAGER.addClientSession(userId, session);
- session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
+// session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else if (AppTypeEnum.recycler.name().equals(appType)) {
SESSION_MANAGER.addRecyclerSession(userId, session);
- session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
+// session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else {
- session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确,连接即将关闭!!! 发送时间:" + LocalDateTime.now().toString()));
+// session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确,连接即将关闭!!! 发送时间:" + LocalDateTime.now().toString()));
SESSION_MANAGER.removeAndClose(session.getId());
log.error("ws发送参数不正确,连接关闭!!!");
}