Socket发送消息

mqtt初始化
master
guochaojie 4 months ago
parent dd64b29e8b
commit f6b5f12eef

@ -62,6 +62,35 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> </dependency>
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- MQTT -->
<!-- <dependency>-->
<!-- <groupId>org.eclipse.paho</groupId>-->
<!-- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>-->
<!-- <version>1.2.5</version>-->
<!-- </dependency>-->
<!-- 自签名证书 -->
<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<!-- <dependency>-->
<!-- <groupId>org.bouncycastle</groupId>-->
<!-- <artifactId>bcpkix-jdk15on</artifactId>-->
<!-- <version>1.70</version>-->
<!-- </dependency>-->
</dependencies> </dependencies>
<build> <build>

@ -0,0 +1,31 @@
package cc.yunxi.config;
import cc.yunxi.config.props.MqttProperties;
import cc.yunxi.utils.CustomerMqttClient;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Slf4j
//@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig {
@Resource
private MqttProperties properties;
@Bean
public CustomerMqttClient customerMqttClient() throws MqttException {
CustomerMqttClient client = new CustomerMqttClient();
client.setMqttProperties(properties);
client.connect();
log.info("===========> MQTT连接成功 <===========");
return client;
}
}

@ -0,0 +1,21 @@
package cc.yunxi.config.props;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map;
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String host;
private String clientId;
private String username;
private String password;
private Map<String, Integer> topics;
private Boolean cleanSession;
private Integer timeout;
private Integer qos;
private Integer keepAlive;
}

@ -43,6 +43,7 @@ import org.springframework.web.bind.annotation.*;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -220,7 +221,7 @@ public class RecycleOrderController {
cancel.setOrderNo(order.getOrderNumber()); cancel.setOrderNo(order.getOrderNumber());
cancel.setOrderStatus(order.getOrderStatus()); cancel.setOrderStatus(order.getOrderStatus());
cancel.setCancelOrderTime(LocalDateTime.now()); cancel.setCancelOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
cancel.setClientPhone(order.getClientMobile()); cancel.setClientPhone(order.getClientMobile());
cancel.setClientAddress(addressInfo.getReceiveStreet()); cancel.setClientAddress(addressInfo.getReceiveStreet());
@ -252,9 +253,14 @@ public class RecycleOrderController {
taken.setOrderNo(order.getOrderNumber()); taken.setOrderNo(order.getOrderNumber());
taken.setOrderStatus(order.getOrderStatus()); taken.setOrderStatus(order.getOrderStatus());
taken.setTakeOrderUser(order.getStaffsName()); taken.setTakeOrderUser(recycler.getStaffsName());
taken.setTakeUserPhone(recycler.getMobilePhone()); taken.setTakeUserPhone(recycler.getMobilePhone());
taken.setTakeOrderTime(LocalDateTime.now()); taken.setTakeOrderTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
try {
wsService.sendMsgToUser(taken);
} catch (IOException e) {
e.printStackTrace();
}
return CommonResult.success(true); return CommonResult.success(true);
} }
@ -289,10 +295,12 @@ public class RecycleOrderController {
finish.setOrderNo(order.getOrderNumber()); finish.setOrderNo(order.getOrderNumber());
finish.setOrderStatus(order.getOrderStatus()); finish.setOrderStatus(order.getOrderStatus());
// finish.setRealWeight(order.getPredictWeight().toString());
finish.setRealMoney(order.getOrderAmount()); finish.setRealMoney(order.getOrderAmount());
finish.setTakeOrderUser(recycler.getStaffsName()); finish.setTakeOrderUser(recycler.getStaffsName());
finish.setTakeUserPhone(recycler.getMobilePhone()); finish.setTakeUserPhone(recycler.getMobilePhone());
try { try {
wsService.sendMsgToUser(finish); wsService.sendMsgToUser(finish);
} catch (IOException e) { } catch (IOException e) {

@ -7,15 +7,12 @@ import cc.yunxi.domain.vo.socket.MessageTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage; import cc.yunxi.domain.vo.socket.SocketMessage;
import cc.yunxi.domain.vo.vxmessage.*; import cc.yunxi.domain.vo.vxmessage.*;
import cc.yunxi.service.IWsService; import cc.yunxi.service.IWsService;
import cc.yunxi.utils.UserContext; import cc.yunxi.utils.*;
import cc.yunxi.utils.VerifyUtil;
import cc.yunxi.utils.WeChatMessageUtil;
import cc.yunxi.utils.WeChatUtil;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -35,6 +32,9 @@ public class WxMessageController {
@Resource @Resource
private VerifyUtil verifyUtil; private VerifyUtil verifyUtil;
// @Resource
private MqttTool mqttTool;
/** /**
* *
* *
@ -162,5 +162,16 @@ public class WxMessageController {
return CommonResult.success(result); return CommonResult.success(result);
} }
@PostMapping("/test")
public CommonResult<String> test() {
MqttMessage msg = new MqttMessage();
msg.setQos(1);
msg.setPayload("test".getBytes());
mqttTool.publish("/test", msg);
System.out.println(mqttTool.isConnected());
mqttTool.subscribe("/testtopic/#", 1);
return CommonResult.success("success");
}
} }

@ -8,14 +8,14 @@ import lombok.Data;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@Data @Data
@ApiModel("订单") @ApiModel("订单取消")
public class OrderCancel extends SocketMessage{ public class OrderCancel extends SocketMessage{
@ApiModelProperty("订单号") @ApiModelProperty("订单号")
private String orderNo; private String orderNo;
@ApiModelProperty("订单状态") @ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus; private OrderStatusEnum orderStatus;
@ApiModelProperty("取消时间") @ApiModelProperty("取消时间")
private LocalDateTime cancelOrderTime; private String cancelOrderTime;
@ApiModelProperty("预约人号码") @ApiModelProperty("预约人号码")
private String clientPhone; private String clientPhone;
@ApiModelProperty("预约地址") @ApiModelProperty("预约地址")

@ -7,15 +7,13 @@ import lombok.Data;
import java.math.BigDecimal; import java.math.BigDecimal;
@ApiModel("订单") @ApiModel("订单完成")
@Data @Data
public class OrderFinish extends SocketMessage{ public class OrderFinish extends SocketMessage{
@ApiModelProperty("订单号") @ApiModelProperty("订单号")
private String orderNo; private String orderNo;
@ApiModelProperty("订单状态") @ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus; private OrderStatusEnum orderStatus;
@ApiModelProperty("实际重量")
private String realWeight;
@ApiModelProperty("实际金额") @ApiModelProperty("实际金额")
private BigDecimal realMoney; private BigDecimal realMoney;
@ApiModelProperty("接单人") @ApiModelProperty("接单人")

@ -19,5 +19,5 @@ public class OrderTaken extends SocketMessage {
@ApiModelProperty("接单人电话") @ApiModelProperty("接单人电话")
private String takeUserPhone; private String takeUserPhone;
@ApiModelProperty("接单时间") @ApiModelProperty("接单时间")
private LocalDateTime takeOrderTime; private String takeOrderTime;
} }

@ -41,6 +41,7 @@ public class WsServiceImpl implements IWsService {
} else { } else {
// 用户不在线,添加到缓存队列 // 用户不在线,添加到缓存队列
log.info("用户不在线,添加到缓存队列,上线后重发!"); log.info("用户不在线,添加到缓存队列,上线后重发!");
log.info("接收人:{},消息:{}", userId, text);
sessionManager.addCacheMessage(message); sessionManager.addCacheMessage(message);
} }
} }

@ -0,0 +1,141 @@
package cc.yunxi.utils;
import cc.yunxi.config.props.MqttProperties;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Map;
@Slf4j
public class CustomerMqttClient {
private static MqttAsyncClient client;
private MqttProperties properties;
public static MqttAsyncClient getClient() {
return client;
}
public static void setClient(MqttAsyncClient client) {
CustomerMqttClient.client = client;
}
public void connect() throws MqttException {
client = new MqttAsyncClient(properties.getHost(), properties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
if (StrUtil.isNotEmpty(properties.getUsername())) {
options.setUserName(properties.getUsername());
}
if (StrUtil.isNotEmpty(properties.getPassword())) {
options.setPassword(properties.getPassword().toCharArray());
}
options.setCleanSession(properties.getCleanSession());
options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout());
if (!client.isConnected()) {
client.connect(options);
client.setCallback(new MyMqttCallback(this));
log.info("连接MQTT服务成功 success");
} else {
client.disconnect();
client.connect(options);
log.info("重新连接MQTT服务成功 success");
}
}
public void disconnect() throws MqttException {
client.disconnect();
}
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, properties.getQos(), false);
}
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttAsyncClient asyncClient = CustomerMqttClient.getClient();
// MqttTopic mqttTopic = asyncClient..getTopic(topic);
// if (null == mqttTopic) {
// log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
// }
try {
IMqttDeliveryToken deliveryToken = asyncClient.publish(topic, message);
MqttMessage reMessage = deliveryToken.getMessage();
log.info("发布成功 topic: {} , qos: {} , message: {},返回消息:{}", topic, qos, pushMessage,reMessage.getPayload());
} catch (MqttException e) {
log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, pushMessage);
e.printStackTrace();
throw new RuntimeException(e);
}
// MqttDeliveryToken token;//Delivery:配送
// synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
// try {
// token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
// token.waitForCompletion(1000L);
// } catch (MqttPersistenceException e) {
// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
// e.printStackTrace();
// } catch (MqttException e) {
// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
// e.printStackTrace();
// }
// }
}
public void subscribe(String topic, int qos) {
if (client != null && client.isConnected()) {
try {
client.subscribe(topic, qos);
log.info("订阅主题成功,topic: {} , qos: {} ", topic, qos);
} catch (MqttException e) {
log.error("订阅主题失败 topic:{}, qos: {} ", topic, qos);
e.printStackTrace();
}
} else {
log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topic, qos);
}
}
public void unsubscribe(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
log.info("取消订阅成功 topic success:{}", topic);
} catch (MqttException e) {
log.error("取消订阅失败 topic:{}", topic);
e.printStackTrace();
}
} else {
log.error("客户端连接状态已断开,取消订阅失败 topic:{}", topic);
}
}
public void initSubscribe() {
if (client != null && client.isConnected()) {
Map<String, Integer> topics = properties.getTopics();
for (Map.Entry<String, Integer> entry : topics.entrySet()) {
String topic = entry.getKey();
int qos = entry.getValue();
try {
client.subscribe(topic, qos);
log.info("初始化订阅主题成功,topic: {} , qos: {} ", topic, qos);
} catch (MqttException e) {
log.error("初始化订阅主题失败 topic:{}, qos: {} ", topic, qos);
e.printStackTrace();
}
}
} else {
log.error("client is not connected,fail to init subscribe topics!!!");
}
}
public void setMqttProperties(MqttProperties mqttProperties) {
this.properties = mqttProperties;
}
}

@ -0,0 +1,57 @@
package cc.yunxi.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
//@Component
@Slf4j
public class MqttTool {
// @Resource
private MqttClient client;
public boolean publish(String topic, MqttMessage message) {
try {
client.publish(topic, message);
log.warn("发布成功topic:{}message:{}", topic, message.getPayload());
return true;
} catch (MqttException e) {
log.error("发布失败topic:{}message:{}", topic, message.getPayload(), e);
e.printStackTrace();
return false;
}
}
public boolean subscribe(String topic, int qos) {
try {
if (client.isConnected() == false) {
log.warn("链接已断开topic:{}qos:{}", topic, qos);
}
client.subscribe(topic, qos);
log.warn("订阅成功topic:{}qos:{}", topic, qos);
return true;
} catch (MqttException e) {
log.warn("订阅主题失败topic:{}qos:{}", topic, qos);
e.printStackTrace();
return false;
}
}
public boolean isConnected() {
return client.isConnected();
}
public void setWill() throws MqttException {
// 设置遗嘱
String willTopic = "willTopic";
String willMessage = "willMessage";
int willQos = 2;
boolean willRetain = false;
// (willTopic, willMessage.getBytes(), willQos, willRetain);
}
}

@ -0,0 +1,73 @@
package cc.yunxi.utils;
import cc.yunxi.domain.po.Client;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
private CustomerMqttClient mqttClient;
public MyMqttCallback(CustomerMqttClient mqttClient) {
this.mqttClient = mqttClient;
}
//链接断开
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage());
AtomicLong reconnectTimes = new AtomicLong(1);
while (true) {
try {
if (mqttClient.getClient().isConnected()) {
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
log.warn("mqtt 重新连接成功,重新开始初始化订阅主题...");
return;
}
reconnectTimes.incrementAndGet();
log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now());
mqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("mqtt断连异常", e.getMessage());
e.printStackTrace();
}
try {
this.wait(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// 接受成功
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("接收到MQTT消息主题{},消息体:{}", topic, mqttMessage.getPayload().toString());
// topicHandler.handleTopic(topic, mqttMessage);
}
//发送完成
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
log.info("消息发送完成消息ID{}", iMqttDeliveryToken.getMessageId());
try {
log.info("消息发送完成,消息内容:{}", iMqttDeliveryToken.getMessage().getPayload().toString());
} catch (MqttException e) {
log.error("获取消息内容失败,原因:{}", e.getMessage());
e.printStackTrace();
}
}
//连接完成后
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连");
mqttClient.initSubscribe();
}
}

@ -0,0 +1,13 @@
package cc.yunxi.utils;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
public class TopicHandler {
public void handleTopic(String topic, MqttMessage message) {
// 处理订阅的主题和消息
System.out.println("Received message on topic: " + topic + ", message: " + message.getPayload());
}
}

@ -41,12 +41,12 @@ public class WsSocketHandler extends AbstractWebSocketHandler {
String appType = json.getStr("appType"); String appType = json.getStr("appType");
if (AppTypeEnum.client.name().equals(appType)) { if (AppTypeEnum.client.name().equals(appType)) {
SESSION_MANAGER.addClientSession(userId, session); SESSION_MANAGER.addClientSession(userId, session);
session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); // session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else if (AppTypeEnum.recycler.name().equals(appType)) { } else if (AppTypeEnum.recycler.name().equals(appType)) {
SESSION_MANAGER.addRecyclerSession(userId, session); SESSION_MANAGER.addRecyclerSession(userId, session);
session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString())); // session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else { } else {
session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确连接即将关闭 发送时间:" + LocalDateTime.now().toString())); // session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确连接即将关闭 发送时间:" + LocalDateTime.now().toString()));
SESSION_MANAGER.removeAndClose(session.getId()); SESSION_MANAGER.removeAndClose(session.getId());
log.error("ws发送参数不正确连接关闭"); log.error("ws发送参数不正确连接关闭");
} }

Loading…
Cancel
Save