mqtt 消息通信

master
guochaojie 4 months ago
parent aa6744d4d6
commit 226108637f

@ -8,7 +8,6 @@ import cc.yunxi.domain.po.*;
import cc.yunxi.domain.vo.device.*; import cc.yunxi.domain.vo.device.*;
import cc.yunxi.domain.vo.file.FileUploadRespVO; import cc.yunxi.domain.vo.file.FileUploadRespVO;
import cc.yunxi.service.*; import cc.yunxi.service.*;
import cc.yunxi.utils.CommandUtil;
import cc.yunxi.utils.CustomerMqttClient; import cc.yunxi.utils.CustomerMqttClient;
import cc.yunxi.utils.RedisTool; import cc.yunxi.utils.RedisTool;
import cc.yunxi.utils.UserContext; import cc.yunxi.utils.UserContext;
@ -69,10 +68,7 @@ public class DeviceController {
@Resource @Resource
private RedisTool redisTool; private RedisTool redisTool;
@Resource @Resource
private CommandUtil commandUtil;
@Resource
private IFileService fileService; private IFileService fileService;
@Resource @Resource
private CustomerMqttClient mqttClient; private CustomerMqttClient mqttClient;
@ -222,7 +218,6 @@ public class DeviceController {
} }
private void fillQrCode(String qrCode, DeviceRespVO respVO) { private void fillQrCode(String qrCode, DeviceRespVO respVO) {
ArrayList<DeviceRespVO.Poster> posters = new ArrayList<>();
List<FileUploadRespVO> files = JSONUtil.toList(qrCode, FileUploadRespVO.class); List<FileUploadRespVO> files = JSONUtil.toList(qrCode, FileUploadRespVO.class);
if (files.size() > 0) { if (files.size() > 0) {
FileUploadRespVO file = files.get(0); FileUploadRespVO file = files.get(0);
@ -504,65 +499,53 @@ public class DeviceController {
@ApiOperation("设备命令下发") @ApiOperation("设备命令下发")
@PostMapping("/command") @PostMapping("/command")
public CommonResult<String> command(@RequestBody CommandVO cmd) { public CommonResult<String> command(@RequestBody CommandVO cmd) {
UserDTO user = UserContext.getUser();
String deviceCode = cmd.getDeviceCode(); String deviceCode = cmd.getDeviceCode();
Object value = redisTool.getValue(deviceCode); Object value = redisTool.getValue(deviceCode);
if (null == value) { if (null == value) return CommonResult.error(400, "设备不在线");
return CommonResult.error(400, "设备不在线");
}
DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class); DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class);
// 判断设备是否在线 // 判断设备是否在线
Boolean online = status.getOnline(); boolean online = status.isOnline();
if (!online) return CommonResult.error(400, "设备不在线"); if (!online) return CommonResult.error(400, "设备不在线");
boolean connected = mqttClient.isConnected();
if (!connected) throw new BizIllegalException("与mqtt服务器连接中断请稍后重试");
// 下发指令 // 下发指令
String ip = status.getIp(); mqttClient.publish(deviceCode + "/command", JSONUtil.toJsonStr(cmd));
CommonResult _result = commandUtil.sendCommand(cmd, ip);
mqttClient.publish("/device/command/result", JSONUtil.toJsonStr(cmd), 1, false);
// 记录指令下发事件 // 记录指令下发事件
RecycleDeviceEvent event = new RecycleDeviceEvent(); RecycleDeviceEvent event = new RecycleDeviceEvent();
event.setDeviceCode(deviceCode); event.setDeviceCode(deviceCode);
event.setBucketCode("");// event.setBucketCode("");//桶编码
event.setDoorNum(0);// if (StrUtil.isNotEmpty(cmd.getData())) {
event.setBucketCode(cmd.getData());//桶编码
}
event.setEventType(deviceCode); event.setEventType(deviceCode);
event.setEventDesc("设备命令下发"); event.setEventDesc("设备命令下发");
event.setEventTime(new Date()); event.setEventTime(new Date());
event.setIsLocal("0");//远程调用 event.setIsLocal("0");//远程调用
if (_result.getCode() != 200) {
event.setEventResult("1");
} else {
event.setEventResult("0");
}
UserDTO user = UserContext.getUser();
event.setCreateUserId(user.getId()); event.setCreateUserId(user.getId());
event.setCreateTime(new Date()); event.setCreateTime(new Date());
event.setCompanyId(user.getCompanyId()); event.setCompanyId(user.getCompanyId());
event.setOrganizeJsonId("");
eventService.save(event); eventService.save(event);
return _result; return CommonResult.success("指令下发成功");
} }
@ApiOperation("获取设备状态") @ApiOperation("获取设备状态")
@PostMapping("/getStatus") @PostMapping("/getStatus")
public CommonResult<String> getStatus(@RequestBody DeviceVO deviceVO) { public CommonResult<String> getStatus(@RequestBody DeviceVO deviceVO) {
Object value = redisTool.getValue(deviceVO.getDeviceCode()); Object value = redisTool.getValue(deviceVO.getDeviceCode());
if (value == null) { if (value == null) return CommonResult.error(400, "获取失败,设备不在线!");
return CommonResult.error(400, "获取失败,设备不在线!");
}
DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class); DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class);
if (status.isOnline()) return CommonResult.error(400, "获取失败,设备不在线!");
CommandVO command = new CommandVO(); CommandVO command = new CommandVO();
command.setCmd(CMDEnum.status); command.setCmd(CMDEnum.status);
command.setDeviceCode(deviceVO.getDeviceCode()); command.setDeviceCode(deviceVO.getDeviceCode());
command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
command.setData(""); command.setData("");
command.setRemark(""); command.setRemark("");
CommonResult commonResult = commandUtil.sendCommand(command, status.getIp()); if (!mqttClient.isConnected()) throw new BizIllegalException("与mqtt服务器连接中断请稍后重试");
Object data = commonResult.getData(); mqttClient.publish(deviceVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(command));
DeviceStatus status1 = JSONUtil.toBean(data.toString(), DeviceStatus.class);
deviceService.updateDevice(status1);
status1.getBucketList().forEach(bucket -> {
bucketService.updateStatus(bucket);
});
return CommonResult.success("获取成功!"); return CommonResult.success("获取成功!");
} }
@ -613,7 +596,7 @@ public class DeviceController {
} }
} }
} }
redisTool.setValue(deviceCode, JSONUtil.toJsonStr(des), 1000 * 60 * 60l); redisTool.setValue(deviceCode, JSONUtil.toJsonStr(des), 1000 * 60 * 60L);
CommonResult<String> success = CommonResult.success(deviceCode); CommonResult<String> success = CommonResult.success(deviceCode);
success.setMsg("success"); success.setMsg("success");
return success; return success;
@ -636,7 +619,7 @@ public class DeviceController {
} }
} }
} }
redisTool.setValue(weight.getBucketCode(), JSONUtil.toJsonStr(weight), 1000 * 60l);//一分钟过期 redisTool.setValue(weight.getBucketCode(), JSONUtil.toJsonStr(weight), 1000 * 60L);//一分钟过期
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -18,13 +18,13 @@ public class BucketStatus {
@NotNull(message = "门号不能为空") @NotNull(message = "门号不能为空")
private Integer doorNum; private Integer doorNum;
@ApiModelProperty(value = "投递门状态",required = true) @ApiModelProperty(value = "投递门状态",required = true)
private Boolean deliveryDoor; private boolean deliveryDoor;
@ApiModelProperty(value = "清运门状态",required = true) @ApiModelProperty(value = "清运门状态",required = true)
private Boolean cleanDoor; private boolean cleanDoor;
@ApiModelProperty(value = "烟雾报警", required = true, example = "true") @ApiModelProperty(value = "烟雾报警", required = true, example = "true")
private Boolean smogAlarm; private boolean smogAlarm;
@ApiModelProperty(value = "是否满",required = true) @ApiModelProperty(value = "是否满",required = true)
private Boolean isFull; private boolean isFull;
@ApiModelProperty(value = "重量",required = true) @ApiModelProperty(value = "重量",required = true)
private BigDecimal weight; private BigDecimal weight;
} }

@ -5,7 +5,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
@Data @Data
@ -15,7 +14,7 @@ public class DeviceStatus {
@NotBlank(message = "设备编码不能为空") @NotBlank(message = "设备编码不能为空")
private String deviceCode; private String deviceCode;
@ApiModelProperty(value = "是否在线", hidden = true) @ApiModelProperty(value = "是否在线", hidden = true)
private Boolean online; private boolean online;
@ApiModelProperty(value = "设备ip地址", hidden = true) @ApiModelProperty(value = "设备ip地址", hidden = true)
private String ip; private String ip;
@ApiModelProperty(value = "设备端口", hidden = true) @ApiModelProperty(value = "设备端口", hidden = true)

@ -1,24 +1,18 @@
package cc.yunxi.service.impl; package cc.yunxi.service.impl;
import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.common.exception.BizIllegalException; import cc.yunxi.common.exception.BizIllegalException;
import cc.yunxi.common.exception.BusinessLogicException; import cc.yunxi.common.exception.BusinessLogicException;
import cc.yunxi.common.exception.ForbiddenException;
import cc.yunxi.config.props.WxHsyProperties; import cc.yunxi.config.props.WxHsyProperties;
import cc.yunxi.config.props.WxShProperties; import cc.yunxi.config.props.WxShProperties;
import cc.yunxi.domain.dto.UserDTO; import cc.yunxi.domain.dto.UserDTO;
import cc.yunxi.domain.dto.WxLoginDTO; import cc.yunxi.domain.dto.WxLoginDTO;
import cc.yunxi.domain.dto.ZSGLoginDTO; import cc.yunxi.domain.dto.ZSGLoginDTO;
import cc.yunxi.domain.po.Client; import cc.yunxi.domain.po.Client;
import cc.yunxi.domain.po.Manager;
import cc.yunxi.domain.po.Recycler; import cc.yunxi.domain.po.Recycler;
import cc.yunxi.domain.query.RecyclerQuery;
import cc.yunxi.domain.vo.device.CMDEnum; import cc.yunxi.domain.vo.device.CMDEnum;
import cc.yunxi.domain.vo.device.CommandVO; import cc.yunxi.domain.vo.device.CommandVO;
import cc.yunxi.domain.vo.device.LoginRespVO; import cc.yunxi.domain.vo.device.LoginRespVO;
import cc.yunxi.enums.UserTypeEnum; import cc.yunxi.enums.UserTypeEnum;
import cc.yunxi.mapper.ManagerMapper;
import cc.yunxi.mapper.ProductMapper;
import cc.yunxi.service.IClientService; import cc.yunxi.service.IClientService;
import cc.yunxi.service.ICommonService; import cc.yunxi.service.ICommonService;
import cc.yunxi.service.IManagerService; import cc.yunxi.service.IManagerService;
@ -30,18 +24,13 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.swagger.annotations.ApiModelProperty;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* *
@ -71,7 +60,7 @@ public class CommonService implements ICommonService {
private WxShProperties wxShProperties; private WxShProperties wxShProperties;
@Resource @Resource
private CommandUtil commandUtil; private CustomerMqttClient mqttClient;
@Resource @Resource
private JwtTool jwtTool; private JwtTool jwtTool;
@ -179,8 +168,13 @@ public class CommonService implements ICommonService {
command.setData(data); command.setData(data);
command.setOptTime(DateUtil.now()); command.setOptTime(DateUtil.now());
command.setRemark("登录成功"); command.setRemark("登录成功");
CommonResult commonResult = commandUtil.sendCommand(command); boolean connected = mqttClient.isConnected();
log.info("设备登录结果 回收员 ===> code:{},msg:{}", commonResult.getCode(), commonResult.getMsg()); if (connected) {
mqttClient.publish(wxLoginDTO.getDevCode() + "command", JSONUtil.toJsonStr(command));
log.info("扫描登录 回收员 ===> 通知设备 code:{}", wxLoginDTO.getDevCode());
} else {
log.error("扫描登录失败 回收员 ===> mqtt未连接 code:{}", wxLoginDTO.getDevCode());
}
} }
return userDTO; return userDTO;
} }
@ -215,8 +209,13 @@ public class CommonService implements ICommonService {
command.setData(data); command.setData(data);
command.setOptTime(DateUtil.now()); command.setOptTime(DateUtil.now());
command.setRemark("登录成功"); command.setRemark("登录成功");
CommonResult commonResult = commandUtil.sendCommand(command); boolean connected = mqttClient.isConnected();
log.info("设备登录结果 散户 ===> code:{},msg:{}", commonResult.getCode(), commonResult.getMsg()); if (connected) {
mqttClient.publish(wxLoginDTO.getDevCode() + "command", JSONUtil.toJsonStr(command));
log.info("扫描登录 散户 ===> 通知设备 code:{}", wxLoginDTO.getDevCode());
} else {
log.error("扫描登录失败 散户 ===> mqtt未连接 code:{}", wxLoginDTO.getDevCode());
}
} }
return userDTO; return userDTO;

@ -3,7 +3,6 @@ package cc.yunxi.service.impl;
import cc.yunxi.domain.po.RecycleBucket; import cc.yunxi.domain.po.RecycleBucket;
import cc.yunxi.domain.vo.device.BucketStatus; import cc.yunxi.domain.vo.device.BucketStatus;
import cc.yunxi.mapper.RecycleBucketMapper; import cc.yunxi.mapper.RecycleBucketMapper;
import cc.yunxi.mapper.RecycleDeviceMapper;
import cc.yunxi.service.IRecycleBucketService; import cc.yunxi.service.IRecycleBucketService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
@ -38,10 +37,10 @@ public class RecycleBucketServiceImpl extends ServiceImpl<RecycleBucketMapper, R
UpdateWrapper<RecycleBucket> wrapper = new UpdateWrapper<>(); UpdateWrapper<RecycleBucket> wrapper = new UpdateWrapper<>();
wrapper.lambda() wrapper.lambda()
.set(RecycleBucket::getCurrentWeight, status.getWeight()) .set(RecycleBucket::getCurrentWeight, status.getWeight())
.set(RecycleBucket::getDeliveryDoorStatus, status.getDeliveryDoor() ? 1 : 0) .set(RecycleBucket::getDeliveryDoorStatus, status.isDeliveryDoor() ? 1 : 0)
.set(RecycleBucket::getCleanDoorStatus, status.getCleanDoor() ? 1 : 0) .set(RecycleBucket::getCleanDoorStatus, status.isCleanDoor() ? 1 : 0)
.set(RecycleBucket::getFullStatus, status.getIsFull() ? 1 : 0) .set(RecycleBucket::getFullStatus, status.isFull() ? 1 : 0)
.set(RecycleBucket::getSmogStatus, status.getSmogAlarm() ? 1 : 0) .set(RecycleBucket::getSmogStatus, status.isSmogAlarm() ? 1 : 0)
.eq(RecycleBucket::getBucketCode, status.getBucketCode()); .eq(RecycleBucket::getBucketCode, status.getBucketCode());
boolean update = this.update(wrapper); boolean update = this.update(wrapper);
return update ? 1 : 0; return update ? 1 : 0;
@ -52,19 +51,16 @@ public class RecycleBucketServiceImpl extends ServiceImpl<RecycleBucketMapper, R
if (old.getWeight().compareTo(now.getWeight()) != 0) { if (old.getWeight().compareTo(now.getWeight()) != 0) {
return true; return true;
} }
if (old.getIsFull() != now.getIsFull()) { if (old.isFull() != now.isFull()) {
return true; return true;
} }
if (old.getSmogAlarm() != now.getSmogAlarm()) { if (old.isSmogAlarm() != now.isSmogAlarm()) {
return true; return true;
} }
if (old.getCleanDoor() != now.getCleanDoor()) { if (old.isCleanDoor() != now.isCleanDoor()) {
return true; return true;
} }
if (old.getDeliveryDoor() != now.getDeliveryDoor()) { return old.isDeliveryDoor() != now.isDeliveryDoor();
return true;
}
return false;
} }
@Override @Override
public RecycleBucket getBagCode(String deviceCode, String bucketCode) { public RecycleBucket getBagCode(String deviceCode, String bucketCode) {

@ -37,7 +37,7 @@ public class RecycleDeviceServiceImpl extends ServiceImpl<RecycleDeviceMapper, R
UpdateWrapper<RecycleDevice> wrapper = new UpdateWrapper<>(); UpdateWrapper<RecycleDevice> wrapper = new UpdateWrapper<>();
wrapper.lambda() wrapper.lambda()
.set(RecycleDevice::getIpAddress, status.getIp()) .set(RecycleDevice::getIpAddress, status.getIp())
.set(RecycleDevice::getStatus, status.getOnline() ? 1 : 0) .set(RecycleDevice::getStatus, status.isOnline() ? 1 : 0)
.eq(RecycleDevice::getDeviceCode, status.getDeviceCode()); .eq(RecycleDevice::getDeviceCode, status.getDeviceCode());
return update(wrapper) ? 1 : 0; return update(wrapper) ? 1 : 0;
} }
@ -45,7 +45,6 @@ public class RecycleDeviceServiceImpl extends ServiceImpl<RecycleDeviceMapper, R
@Override @Override
public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) { public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) {
if (!oldStatus.getIp().equals(newStatus.getIp())) return true; if (!oldStatus.getIp().equals(newStatus.getIp())) return true;
if (oldStatus.getOnline() != newStatus.getOnline()) return true; return oldStatus.isOnline() != newStatus.isOnline();
return false;
} }
} }

@ -1,43 +0,0 @@
package cc.yunxi.utils;
import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.domain.vo.device.CommandVO;
import cc.yunxi.domain.vo.device.DeviceStatus;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component
public class CommandUtil {
private static final int PORT = 5000;
@Resource
private RedisTool redisTool;
public CommonResult sendCommand(CommandVO command) {
DeviceStatus status = getDeviceStatus(command.getDeviceCode());
if (null == status) return CommonResult.error(400, "设备不在线!");
String ip = status.getIp();
String res = HttpUtil.post("http://" + ip + ":" + PORT + "/command", JSONUtil.toJsonStr(command), 1000);
return JSONUtil.toBean(res, CommonResult.class);
}
public CommonResult sendCommand(CommandVO command, String ip) {
String res = HttpUtil.post("http://" + ip + ":" + PORT + "/command", JSONUtil.toJsonStr(command), 1000);
return JSONUtil.toBean(res, CommonResult.class);
}
private DeviceStatus getDeviceStatus(String deviceCode) {
boolean exist = redisTool.existsKey(deviceCode);
long expire = redisTool.getKeyExpire(deviceCode, TimeUnit.SECONDS);
if (!exist || expire <= 0) {
return null;
}
Object value = redisTool.getValue(deviceCode);
if (value == null) return null;
return JSONUtil.toBean(value.toString(), DeviceStatus.class);
}
}

@ -1,5 +1,6 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import cc.yunxi.common.exception.BizIllegalException;
import cc.yunxi.config.props.MqttProperties; import cc.yunxi.config.props.MqttProperties;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -9,6 +10,7 @@ import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
@ -38,7 +40,7 @@ public class CustomerMqttClient {
options.setKeepAliveInterval(properties.getKeepAlive()); options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout()); options.setConnectionTimeout(properties.getTimeout());
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
options.setSessionExpiryInterval(0l);//会话过期时间 options.setSessionExpiryInterval(0L);//会话过期时间
client.setCallback(new MyMqttCallback(client)); client.setCallback(new MyMqttCallback(client));
if (!client.isConnected()) { if (!client.isConnected()) {
client.connect(options); client.connect(options);
@ -55,6 +57,10 @@ public class CustomerMqttClient {
client.disconnect(); client.disconnect();
} }
public boolean isConnected() {
return client.isConnected();
}
public void publish(String topic, String msg) { public void publish(String topic, String msg) {
publish(topic, msg, properties.getQos(), false); publish(topic, msg, properties.getQos(), false);
} }
@ -68,7 +74,6 @@ public class CustomerMqttClient {
if (null == mqttTopic) { if (null == mqttTopic) {
log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos); log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
} }
boolean connected = client.isConnected();
try { try {
client.publish(topic, message); client.publish(topic, message);
@ -76,7 +81,7 @@ public class CustomerMqttClient {
} catch (MqttException e) { } catch (MqttException e) {
log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg); log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg);
e.printStackTrace(); e.printStackTrace();
throw new RuntimeException(e); throw new BizIllegalException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg);
} }
// MqttDeliveryToken token;//Delivery:配送 // MqttDeliveryToken token;//Delivery:配送
// synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充 // synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充

Loading…
Cancel
Save