mqtt 优化

master
guochaojie 4 months ago
parent 23ce976a38
commit 615a12af00

@ -7,23 +7,21 @@ import org.eclipse.paho.mqttv5.common.MqttException;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
@Slf4j @Slf4j
@Configuration //@Configuration
@EnableConfigurationProperties(MqttProperties.class) //@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig { public class MqttConfig {
@Resource // @Resource
private MqttProperties properties; private MqttProperties properties;
@Bean // @Bean
public CustomerMqttClient customerMqttClient() throws MqttException { public CustomerMqttClient customerMqttClient() throws MqttException {
CustomerMqttClient client = new CustomerMqttClient(); CustomerMqttClient client = new CustomerMqttClient();
client.setMqttProperties(properties); // client.setMqttProperties(properties);
client.connect(); client.connect();
log.info("===========> MQTT连接成功 <==========="); log.info("===========> MQTT连接成功 <===========");
return client; return client;

@ -299,17 +299,12 @@ public class DeviceController {
@PostMapping("/login") @PostMapping("/login")
@ApiOperation("清运员手机号登录") @ApiOperation("清运员手机号登录")
public CommonResult<String> login(@RequestBody LoginReqVO loginReqVO) { public CommonResult<LoginRespVO> login(@RequestBody LoginReqVO loginReqVO) {
RecycleDevice device = deviceService.getByDeviceCode(loginReqVO.getDeviceCode()); RecycleDevice device = deviceService.getByDeviceCode(loginReqVO.getDeviceCode());
if (device == null) { if (device == null) {
throw new BizIllegalException("登录失败:未查询到对应设备信息!"); throw new BizIllegalException("登录失败:未查询到对应设备信息!");
} }
Recycler recycler = recyclerService.getRecyclerByPhoneNumber(loginReqVO.getPhone()); Recycler recycler = recyclerService.getRecyclerByPhoneNumber(loginReqVO.getPhone());
CommandVO command = new CommandVO();
command.setDeviceCode(loginReqVO.getDeviceCode());
command.setCmd(CMDEnum.login);
command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
command.setRemark("登录成功");
if (null != recycler) { if (null != recycler) {
RecycleStaffDevice staffDevice = staffDeviceService.getByDeviceCode(loginReqVO.getDeviceCode(), loginReqVO.getPhone()); RecycleStaffDevice staffDevice = staffDeviceService.getByDeviceCode(loginReqVO.getDeviceCode(), loginReqVO.getPhone());
if (null == staffDevice) { if (null == staffDevice) {
@ -333,9 +328,8 @@ public class DeviceController {
respVO.setPhone(client.getMobilePhone()); respVO.setPhone(client.getMobilePhone());
respVO.setTimeExpire(userInfo.getTimeExpire()); respVO.setTimeExpire(userInfo.getTimeExpire());
respVO.setRole(1); respVO.setRole(1);
command.setData(respVO);
mqttClient.publish(loginReqVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(command)); return CommonResult.success(respVO, "success");
return CommonResult.success("登录成功!", "success");
} }
//登录成功 //登录成功
UserDTO userDTO = commonService.loginDeviceByRecycler(recycler); UserDTO userDTO = commonService.loginDeviceByRecycler(recycler);
@ -349,8 +343,7 @@ public class DeviceController {
respVO.setBalance(BigDecimal.ZERO); respVO.setBalance(BigDecimal.ZERO);
respVO.setTimeExpire(userDTO.getTimeExpire()); respVO.setTimeExpire(userDTO.getTimeExpire());
respVO.setRole(2); respVO.setRole(2);
mqttClient.publish(loginReqVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(respVO)); return CommonResult.success(respVO, "success");
return CommonResult.success("登录成功!", "success");
} }

@ -166,7 +166,7 @@ public class WxMessageController {
public CommonResult<String> test() { public CommonResult<String> test() {
client.subscribe("testtopc/1",1); client.subscribe("testtopc/1",1);
client.publish("testtopc/1","{\"msg\": \"hello\"}"); client.publish("testtopc/1","{\"msg\": \"hello\"}");
client.unsubscribe("testtopc/1"); // client.unsubscribe("testtopc/1");
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -15,5 +15,7 @@ public interface IRecycleDeviceService extends IService<RecycleDevice> {
int updateDevice(DeviceStatus status); int updateDevice(DeviceStatus status);
boolean offline(String deviceCode);
boolean isChanged(DeviceStatus old, DeviceStatus now); boolean isChanged(DeviceStatus old, DeviceStatus now);
} }

@ -42,6 +42,16 @@ public class RecycleDeviceServiceImpl extends ServiceImpl<RecycleDeviceMapper, R
return update(wrapper) ? 1 : 0; return update(wrapper) ? 1 : 0;
} }
@Override
public boolean offline(String deviceCode) {
UpdateWrapper<RecycleDevice> wrapper = new UpdateWrapper<>();
wrapper.lambda()
.set(RecycleDevice::getStatus, 0)
.eq(RecycleDevice::getDeviceCode, deviceCode);
return update(wrapper);
}
@Override @Override
public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) { public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) {
if (!oldStatus.getIp().equals(newStatus.getIp())) return true; if (!oldStatus.getIp().equals(newStatus.getIp())) return true;

@ -10,21 +10,31 @@ import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
@Component
@EnableConfigurationProperties(MqttProperties.class)
public class CustomerMqttClient { public class CustomerMqttClient {
private static MqttClient client; @Resource
private MyMqttCallback callback;
private MqttClient client;
@Resource
private MqttProperties properties; private MqttProperties properties;
public static MqttClient getClient() { @PostConstruct
return client; public void init() {
try {
connect();
} catch (MqttException e) {
log.error("连接MQTT服务失败", e);
throw new BizIllegalException("连接MQTT服务失败");
} }
public static void setClient(MqttClient client) {
CustomerMqttClient.client = client;
} }
public void connect() throws MqttException { public void connect() throws MqttException {
@ -40,8 +50,9 @@ public class CustomerMqttClient {
options.setKeepAliveInterval(properties.getKeepAlive()); options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout()); options.setConnectionTimeout(properties.getTimeout());
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
options.setAutomaticReconnectDelay(1, 3);
options.setSessionExpiryInterval(0L);//会话过期时间 options.setSessionExpiryInterval(0L);//会话过期时间
client.setCallback(new MyMqttCallback(client)); client.setCallback(callback);
if (!client.isConnected()) { if (!client.isConnected()) {
client.connect(options); client.connect(options);
log.info("连接MQTT服务成功 success"); log.info("连接MQTT服务成功 success");
@ -50,6 +61,7 @@ public class CustomerMqttClient {
client.connect(options); client.connect(options);
log.info("重新连接MQTT服务成功 success"); log.info("重新连接MQTT服务成功 success");
} }
callback.registerClient(client);
initSubscribe(); initSubscribe();
} }
@ -83,19 +95,6 @@ public class CustomerMqttClient {
e.printStackTrace(); e.printStackTrace();
throw new BizIllegalException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg); throw new BizIllegalException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg);
} }
// MqttDeliveryToken token;//Delivery:配送
// synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
// try {
// token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
// token.waitForCompletion(1000L);
// } catch (MqttPersistenceException e) {
// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
// e.printStackTrace();
// } catch (MqttException e) {
// log.error("发布主题失败 topic:{}, qos: {} ", topic, qos);
// e.printStackTrace();
// }
// }
} }
public void subscribe(String topicFilter, int qos) { public void subscribe(String topicFilter, int qos) {
@ -129,6 +128,9 @@ public class CustomerMqttClient {
public void initSubscribe() { public void initSubscribe() {
if (client != null && client.isConnected()) { if (client != null && client.isConnected()) {
Map<String, Integer> topics = properties.getTopics(); Map<String, Integer> topics = properties.getTopics();
if(topics == null || topics.size() == 0) {
log.info("未配置初始化订阅主题!!!");
}
for (Map.Entry<String, Integer> entry : topics.entrySet()) { for (Map.Entry<String, Integer> entry : topics.entrySet()) {
String topic = entry.getKey(); String topic = entry.getKey();
int qos = entry.getValue(); int qos = entry.getValue();
@ -139,8 +141,4 @@ public class CustomerMqttClient {
log.error("客户端未连接,初始化订阅失败!!!"); log.error("客户端未连接,初始化订阅失败!!!");
} }
} }
public void setMqttProperties(MqttProperties mqttProperties) {
this.properties = mqttProperties;
}
} }

@ -1,34 +1,52 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Slf4j @Slf4j
@Component
public class MyMqttCallback implements MqttCallback { public class MyMqttCallback implements MqttCallback {
private MqttClient mqttClient; private MqttClient mqttClient;
@Resource
private TopicHandler topicHandler; private TopicHandler topicHandler;
public MyMqttCallback(MqttClient mqttClient) { public void registerClient(MqttClient mqttClient) {
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
} }
@Override @Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString()); log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString());
try {
mqttClient.reconnect();
} catch (MqttException e) {
log.error("MQTT 重新连接失败,错误信息:{}", e.getMessage());
throw new RuntimeException(e);
}
mqttDisconnectResponse.getException().printStackTrace(); mqttDisconnectResponse.getException().printStackTrace();
} }
@Override @Override
public void mqttErrorOccurred(MqttException e) { public void mqttErrorOccurred(MqttException e) {
log.error("MQTT 错误,错误信息:{}", e.getMessage()); log.error("MQTT 错误,错误信息:{}", e.getMessage());
try {
mqttClient.reconnect();
} catch (MqttException ex) {
log.error("MQTT 重新连接失败,错误信息:{}", ex.getMessage());
throw new RuntimeException(ex);
}
e.printStackTrace(); e.printStackTrace();
} }
@ -57,4 +75,7 @@ public class MyMqttCallback implements MqttCallback {
public void authPacketArrived(int i, MqttProperties mqttProperties) { public void authPacketArrived(int i, MqttProperties mqttProperties) {
log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString()); log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString());
} }
} }

@ -1,13 +1,45 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import cc.yunxi.service.IRecycleDeviceService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Component @Component
@Slf4j
public class TopicHandler { public class TopicHandler {
@Resource
private IRecycleDeviceService deviceService;
public void handleTopic(String topic, MqttMessage message) { public void handleTopic(String topic, MqttMessage message) {
// 处理订阅的主题和消息 String[] split = topic.split("/");
System.out.println("Received message on topic: " + topic + ", message: " + message.getPayload()); String deviceCode = split[0];
String action = split[1];
byte[] payload = message.getPayload();
String msg = new String(payload, StandardCharsets.UTF_8);
switch (action) {
case "Dead":
Dead(deviceCode, msg);
break;
default:
break;
}
log.info("消息已处理 topic:{},msg:{}", topic, msg);
}
/**
* 1 线 0 线
*
* @param deviceCode
* @param msg
*/
private void Dead(String deviceCode, String msg) {
if ("1".equals(msg)) {
deviceService.offline(deviceCode);
}
} }
} }

@ -38,4 +38,5 @@ mqtt:
timeout: 30 timeout: 30
qos: 1 qos: 1
keepAlive: 30 keepAlive: 30
topics: {} topics:
"[+/Dead]": 1
Loading…
Cancel
Save