微信消息 fix

master
guochaojie 4 months ago
parent ddedffa021
commit c0d1e9ac7d

@ -10,12 +10,11 @@ import java.util.Map;
public class MqttProperties { public class MqttProperties {
private String host; private String host;
private String clientId; private String clientId;
private String username; private String userName;
private String password; private String passWord;
private Map<String, Integer> topics; private Map<String, Integer> topics;
private Boolean cleanSession; private Boolean cleanSession;
private Integer timeout; private Integer timeout;
private Integer qos; private Integer qos;
private Integer keepAlive; private Integer keepAlive;
} }

@ -117,7 +117,7 @@ public class RecycleOrderController {
message.setMessageType(MessageTypeEnum.BOOKING);//消息类型 message.setMessageType(MessageTypeEnum.BOOKING);//消息类型
message.setAppType(AppTypeEnum.recycler);//发送到哪里 message.setAppType(AppTypeEnum.recycler);//发送到哪里
message.setReceiveUserId("");//接收人 // message.setReceiveUserId("");//接收人
message.setOrderNo(orderId);//订单号 message.setOrderNo(orderId);//订单号
message.setOrderStatus(OrderStatusEnum.PENDING); message.setOrderStatus(OrderStatusEnum.PENDING);

@ -32,8 +32,8 @@ public class WxMessageController {
@Resource @Resource
private VerifyUtil verifyUtil; private VerifyUtil verifyUtil;
// @Resource @Resource
private MqttTool mqttTool; private CustomerMqttClient client;
/** /**
* *
@ -164,12 +164,31 @@ public class WxMessageController {
@PostMapping("/test") @PostMapping("/test")
public CommonResult<String> test() { public CommonResult<String> test() {
MqttMessage msg = new MqttMessage();
msg.setQos(1); client.publish("testtopic/1", "这是一个测试");
msg.setPayload("test".getBytes()); try {
mqttTool.publish("/test", msg); Thread.sleep(1000);
System.out.println(mqttTool.isConnected()); }catch (Exception e){
mqttTool.subscribe("/testtopic/#", 1); e.printStackTrace();
}
client.publish("/testtopic/2", "这是一个测试");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
client.subscribe("/testtopic/#", 1);
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
client.unsubscribe("/testtopic/#");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -38,10 +38,10 @@ public class WsServiceImpl implements IWsService {
} }
if (userSession != null && userSession.isOpen()) { if (userSession != null && userSession.isOpen()) {
userSession.sendMessage(new TextMessage(text)); userSession.sendMessage(new TextMessage(text));
log.info("发送成功,接收人:{},消息:{}", userId, text);
} else { } else {
// 用户不在线,添加到缓存队列 // 用户不在线,添加到缓存队列
log.info("用户不在线,添加到缓存队列,上线后重发!"); log.info("用户不在线,添加到缓存队列,上线后重发 接收人:{},消息:{}", userId, text);
log.info("接收人:{},消息:{}", userId, text);
sessionManager.addCacheMessage(message); sessionManager.addCacheMessage(message);
} }
} }

@ -10,66 +10,66 @@ import java.util.Map;
@Slf4j @Slf4j
public class CustomerMqttClient { public class CustomerMqttClient {
private static MqttAsyncClient client; private static MqttClient client;
private MqttProperties properties; private MqttProperties properties;
public static MqttAsyncClient getClient() { public static MqttClient getClient() {
return client; return client;
} }
public static void setClient(MqttAsyncClient client) { public static void setClient(MqttClient client) {
CustomerMqttClient.client = client; CustomerMqttClient.client = client;
} }
public void connect() throws MqttException { public void connect() throws MqttException {
client = new MqttAsyncClient(properties.getHost(), properties.getClientId(), new MemoryPersistence()); client = new MqttClient(properties.getHost(), properties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
if (StrUtil.isNotEmpty(properties.getUsername())) { if (StrUtil.isNotEmpty(properties.getUserName())) {
options.setUserName(properties.getUsername()); options.setUserName(properties.getUserName());
} }
if (StrUtil.isNotEmpty(properties.getPassword())) { if (StrUtil.isNotEmpty(properties.getPassWord())) {
options.setPassword(properties.getPassword().toCharArray()); options.setPassword(properties.getPassWord().toCharArray());
} }
options.setCleanSession(properties.getCleanSession()); options.setCleanSession(properties.getCleanSession());
options.setKeepAliveInterval(properties.getKeepAlive()); options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout()); options.setConnectionTimeout(properties.getTimeout());
options.setAutomaticReconnect(true);
if (!client.isConnected()) { if (!client.isConnected()) {
client.setCallback(new MyMqttCallback(client));
client.connect(options); client.connect(options);
client.setCallback(new MyMqttCallback(this));
log.info("连接MQTT服务成功 success"); log.info("连接MQTT服务成功 success");
} else { } else {
client.disconnect(); client.disconnect();
client.connect(options); client.connect(options);
log.info("重新连接MQTT服务成功 success"); log.info("重新连接MQTT服务成功 success");
} }
initSubscribe();
} }
public void disconnect() throws MqttException { public void disconnect() throws MqttException {
client.disconnect(); client.disconnect();
} }
public void publish(String pushMessage, String topic) { public void publish(String msg, String topic) {
publish(pushMessage, topic, properties.getQos(), false); publish(msg, topic, properties.getQos(), false);
} }
public void publish(String pushMessage, String topic, int qos, boolean retained) { public void publish(String msg, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage(); MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes()); message.setPayload(msg.getBytes());
message.setQos(qos); message.setQos(qos);
message.setRetained(retained); message.setRetained(retained);
MqttAsyncClient asyncClient = CustomerMqttClient.getClient(); MqttTopic mqttTopic = client.getTopic(topic);
// MqttTopic mqttTopic = asyncClient..getTopic(topic); if (null == mqttTopic) {
// if (null == mqttTopic) { log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
// log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos); }
// }
try { try {
IMqttDeliveryToken deliveryToken = asyncClient.publish(topic, message); client.publish(topic, message);
MqttMessage reMessage = deliveryToken.getMessage(); log.info("发布成功 topic: {} , qos: {} , message: {}", topic, qos, msg);
log.info("发布成功 topic: {} , qos: {} , message: {},返回消息:{}", topic, qos, pushMessage,reMessage.getPayload());
} catch (MqttException e) { } catch (MqttException e) {
log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, pushMessage); log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg);
e.printStackTrace(); e.printStackTrace();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -88,17 +88,17 @@ public class CustomerMqttClient {
// } // }
} }
public void subscribe(String topic, int qos) { public void subscribe(String topicFilter, int qos) {
if (client != null && client.isConnected()) { if (client != null && client.isConnected()) {
try { try {
client.subscribe(topic, qos); client.subscribe(topicFilter, qos);
log.info("订阅主题成功,topic: {} , qos: {} ", topic, qos); log.info("订阅主题成功,topic: {} , qos: {} ", topicFilter, qos);
} catch (MqttException e) { } catch (MqttException e) {
log.error("订阅主题失败 topic:{}, qos: {} ", topic, qos); log.error("订阅主题失败 topic:{}, qos: {} ", topicFilter, qos);
e.printStackTrace(); e.printStackTrace();
} }
} else { } else {
log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topic, qos); log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topicFilter, qos);
} }
} }
@ -122,16 +122,11 @@ public class CustomerMqttClient {
for (Map.Entry<String, Integer> entry : topics.entrySet()) { for (Map.Entry<String, Integer> entry : topics.entrySet()) {
String topic = entry.getKey(); String topic = entry.getKey();
int qos = entry.getValue(); int qos = entry.getValue();
try { log.info("初始化订阅主题 topic: {} , qos: {} ", topic, qos);
client.subscribe(topic, qos); subscribe(topic, qos);
log.info("初始化订阅主题成功,topic: {} , qos: {} ", topic, qos);
} catch (MqttException e) {
log.error("初始化订阅主题失败 topic:{}, qos: {} ", topic, qos);
e.printStackTrace();
}
} }
} else { } else {
log.error("client is not connected,fail to init subscribe topics!!!"); log.error("客户端未连接,初始化订阅失败!!!");
} }
} }

@ -0,0 +1,12 @@
package cc.yunxi.utils;
import org.eclipse.paho.client.mqttv3.MqttClient;
public class HeartbeatTask implements Runnable {
private MqttClient mqttClient;
@Override
public void run() {
// mqttClient.publish("heartbeat", "alive".getBytes());
}
}

@ -10,9 +10,9 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public class MyMqttCallback implements MqttCallbackExtended { public class MyMqttCallback implements MqttCallbackExtended {
private CustomerMqttClient mqttClient; private MqttClient mqttClient;
public MyMqttCallback(CustomerMqttClient mqttClient) { public MyMqttCallback(MqttClient mqttClient) {
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
} }
@ -20,27 +20,32 @@ public class MyMqttCallback implements MqttCallbackExtended {
@Override @Override
public void connectionLost(Throwable throwable) { public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage()); log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage());
AtomicLong reconnectTimes = new AtomicLong(1);
while (true) {
try { try {
if (mqttClient.getClient().isConnected()) { mqttClient.reconnect();
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
log.warn("mqtt 重新连接成功,重新开始初始化订阅主题...");
return;
}
reconnectTimes.incrementAndGet();
log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now());
mqttClient.getClient().reconnect();
} catch (MqttException e) { } catch (MqttException e) {
log.error("mqtt断连异常", e.getMessage());
e.printStackTrace();
}
try {
this.wait(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} // AtomicLong reconnectTimes = new AtomicLong(1);
// while (true) {
// try {
// if (mqttClient.getClient().isConnected()) {
// //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
// log.warn("mqtt 重新连接成功,重新开始初始化订阅主题...");
// return;
// }
// reconnectTimes.incrementAndGet();
// log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now());
// mqttClient.connect();
// } catch (MqttException e) {
// log.error("mqtt断连异常", e.getMessage());
// e.printStackTrace();
// }
// try {
// this.wait(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
} }
// 接受成功 // 接受成功
@ -68,6 +73,5 @@ public class MyMqttCallback implements MqttCallbackExtended {
@Override @Override
public void connectComplete(boolean reconnect, String serverURI) { public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连"); log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连");
mqttClient.initSubscribe();
} }
} }

@ -39,28 +39,46 @@ public class WsSessionManager {
} }
public void addClientSession(String userId, WebSocketSession session) { public void addClientSession(String userId, WebSocketSession session) {
//获取旧session
WebSocketSession oldSession = CLIENT_SESSION.get(userId); WebSocketSession old = CLIENT_SESSION.remove(userId);
if (null == oldSession //没有 if (null != old && old.isOpen()) {
|| !oldSession.getId().equals(session.getId()) //id不同 try {
|| !oldSession.isOpen()) { //旧session已关闭 old.close();
log.info("关闭散户端旧连接:{}{}", userId, session.isOpen());
} catch (IOException e) {
e.printStackTrace();
}
}
if (session.isOpen()) {
CLIENT_SESSION.put(userId, session); //则将新的session添加到池中 CLIENT_SESSION.put(userId, session); //则将新的session添加到池中
log.info("添加散户端连接:{}{}", userId, session.isOpen()); log.info("添加散户端连接:{}{}", userId, session.isOpen());
//判断缓存中是否有未发送的消息 //判断缓存中是否有未发送的消息
sendCacheMessage(userId); sendCacheMessage(userId, AppTypeEnum.client);
} else {
log.error("散户端新连接添加失败,连接已关闭:{}{}", userId, session.isOpen());
} }
} }
public void addRecyclerSession(String userId, WebSocketSession session) { public void addRecyclerSession(String userId, WebSocketSession session) {
//获取旧session //获取旧session
WebSocketSession oldSession = RECYCLER_SESSION.get(userId); WebSocketSession old = RECYCLER_SESSION.remove(userId);
if (null == oldSession //没有 if (null != old && old.isOpen()) {
|| !oldSession.getId().equals(session.getId()) //id不同 try {
|| !oldSession.isOpen()) { //旧session已关闭 old.close();
log.info("关闭回收员端旧连接:{}{}", userId, session.isOpen());
} catch (IOException e) {
e.printStackTrace();
}
}
if (session.isOpen()) {
RECYCLER_SESSION.put(userId, session); //则将新的session添加到池中 RECYCLER_SESSION.put(userId, session); //则将新的session添加到池中
log.info("添加回收员端连接:{}{}", userId, session.isOpen()); log.info("添加回收员端连接:{}{}", userId, session.isOpen());
//判断缓存中是否有未发送的消息 //判断缓存中是否有未发送的消息
sendCacheMessage(userId); sendCacheMessage(userId, AppTypeEnum.recycler);
} else {
log.error("回收员端新连接添加失败,连接已关闭:{}{}", userId, session.isOpen());
} }
} }
@ -78,14 +96,14 @@ public class WsSessionManager {
} }
//处理未发送的消息 //处理未发送的消息
private void sendCacheMessage(String userId) { private void sendCacheMessage(String userId, AppTypeEnum appType) {
// 遍历缓存队列 // 遍历缓存队列
Iterator<SocketMessage> iterator = MESSAGE_CACHE.iterator(); Iterator<SocketMessage> iterator = MESSAGE_CACHE.iterator();
if (iterator.hasNext()) { while (iterator.hasNext()) {
SocketMessage message = iterator.next(); SocketMessage message = iterator.next();
LocalDateTime sendTime = message.getSendTime(); LocalDateTime sendTime = message.getSendTime();
int minute = message.getCacheMinute(); int minute = message.getCacheMinute();
AppTypeEnum appType = message.getAppType(); String receiveUserId = message.getReceiveUserId();
WebSocketSession session = null; WebSocketSession session = null;
switch (appType) { switch (appType) {
case client: case client:
@ -93,24 +111,30 @@ public class WsSessionManager {
break; break;
case recycler: case recycler:
session = RECYCLER_SESSION.get(userId); session = RECYCLER_SESSION.get(userId);
break;
default: default:
log.error("appType错误消息被移除{},{},{}", userId, appType, JSONUtil.toJsonStr(message));
iterator.remove();//移除 iterator.remove();//移除
break; break;
} }
if (userId.equals(message.getReceiveUserId())
if (userId.equals(receiveUserId)
&& appType.equals(message.getAppType())
&& session != null && session != null
&& session.isOpen() && session.isOpen()
&& sendTime.plusMinutes(minute).isAfter(LocalDateTime.now())) { && sendTime.plusMinutes(minute).isAfter(LocalDateTime.now())) {
// 发送消息 // 发送消息
try { try {
session.sendMessage(new TextMessage(JSONUtil.toJsonStr(message))); session.sendMessage(new TextMessage(JSONUtil.toJsonStr(message)));
log.error("消息已发送,消息被移除:{},{},{}", userId, appType, JSONUtil.toJsonStr(message));
iterator.remove();//移除
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
iterator.remove();//移除
} }
//过期的消息直接移除 //过期的消息直接移除
if (sendTime.plusMinutes(minute).isBefore(LocalDateTime.now())) { if (sendTime.plusMinutes(minute).isBefore(LocalDateTime.now())) {
log.error("消息已过期,消息被移除:{},{},{}", userId, appType, JSONUtil.toJsonStr(message));
iterator.remove(); iterator.remove();
} }
} }

@ -32,10 +32,10 @@ public class WsSocketHandler extends AbstractWebSocketHandler {
@Override @Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("发送文本消息"); // log.info("发送文本消息");
// 获得客户端传来的消息 // 获得客户端传来的消息
String payload = message.getPayload(); String payload = message.getPayload();
log.info("server 接收到消息 " + payload); log.info("server 接收到注册消息 " + payload);
JSONObject json = JSONUtil.parseObj(payload); JSONObject json = JSONUtil.parseObj(payload);
String userId = json.getStr("userId"); String userId = json.getStr("userId");
String appType = json.getStr("appType"); String appType = json.getStr("appType");

Loading…
Cancel
Save