Merge remote-tracking branch 'origin/master'

master
jiyufei 4 months ago
commit ef85e73a46

@ -63,25 +63,25 @@
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> </dependency>
<!-- mqtt --> <!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- MQTT -->
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>org.eclipse.paho</groupId>--> <!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>--> <!-- <artifactId>spring-boot-starter-integration</artifactId>-->
<!-- <version>1.2.5</version>-->
<!-- </dependency>--> <!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.integration</groupId>-->
<!-- <artifactId>spring-integration-stream</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.integration</groupId>-->
<!-- <artifactId>spring-integration-mqtt</artifactId>-->
<!-- </dependency>-->
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<!-- 自签名证书 --> <!-- 自签名证书 -->
<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on --> <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<!-- <dependency>--> <!-- <dependency>-->

@ -3,17 +3,18 @@ package cc.yunxi.config;
import cc.yunxi.config.props.MqttProperties; import cc.yunxi.config.props.MqttProperties;
import cc.yunxi.utils.CustomerMqttClient; import cc.yunxi.utils.CustomerMqttClient;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.mqttv5.common.MqttException;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
@Slf4j @Slf4j
//@Configuration @Configuration
//@EnableConfigurationProperties(MqttProperties.class) @EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig { public class MqttConfig {
@Resource @Resource

@ -50,6 +50,7 @@ public class WebMvcConfig implements WebMvcConfigurer {
"/upload/**", "/upload/**",
"/qrcode/**.txt", "/qrcode/**.txt",
"/wx/**",//微信消息推送验证 "/wx/**",//微信消息推送验证
"/wx-message/**",//微信消息推送验证
"/webSocketServer/**",//socket通信 "/webSocketServer/**",//socket通信
"/doc.html", "/doc.html",
"/api/delivery-order/**", "/api/delivery-order/**",

@ -6,12 +6,12 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map; import java.util.Map;
@Data @Data
//@ConfigurationProperties(prefix = "mqtt") @ConfigurationProperties(prefix = "mqtt")
public class MqttProperties { public class MqttProperties {
private String host; private String host;
private String clientId; private String clientId;
private String userName; private String username;
private String passWord; private String password;
private Map<String, Integer> topics; private Map<String, Integer> topics;
private Boolean cleanSession; private Boolean cleanSession;
private Integer timeout; private Integer timeout;

@ -8,7 +8,7 @@ import cc.yunxi.domain.po.*;
import cc.yunxi.domain.vo.device.*; import cc.yunxi.domain.vo.device.*;
import cc.yunxi.domain.vo.file.FileUploadRespVO; import cc.yunxi.domain.vo.file.FileUploadRespVO;
import cc.yunxi.service.*; import cc.yunxi.service.*;
import cc.yunxi.utils.CommandUtil; import cc.yunxi.utils.CustomerMqttClient;
import cc.yunxi.utils.RedisTool; import cc.yunxi.utils.RedisTool;
import cc.yunxi.utils.UserContext; import cc.yunxi.utils.UserContext;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
@ -67,11 +67,10 @@ public class DeviceController {
private ICommonService commonService; private ICommonService commonService;
@Resource @Resource
private RedisTool redisTool; private RedisTool redisTool;
@Resource
private CommandUtil commandUtil;
@Resource @Resource
private IFileService fileService; private IFileService fileService;
@Resource
private CustomerMqttClient mqttClient;
@ApiOperation("小程序获取设备配置") @ApiOperation("小程序获取设备配置")
@ -219,7 +218,6 @@ public class DeviceController {
} }
private void fillQrCode(String qrCode, DeviceRespVO respVO) { private void fillQrCode(String qrCode, DeviceRespVO respVO) {
ArrayList<DeviceRespVO.Poster> posters = new ArrayList<>();
List<FileUploadRespVO> files = JSONUtil.toList(qrCode, FileUploadRespVO.class); List<FileUploadRespVO> files = JSONUtil.toList(qrCode, FileUploadRespVO.class);
if (files.size() > 0) { if (files.size() > 0) {
FileUploadRespVO file = files.get(0); FileUploadRespVO file = files.get(0);
@ -501,64 +499,53 @@ public class DeviceController {
@ApiOperation("设备命令下发") @ApiOperation("设备命令下发")
@PostMapping("/command") @PostMapping("/command")
public CommonResult<String> command(@RequestBody CommandVO cmd) { public CommonResult<String> command(@RequestBody CommandVO cmd) {
UserDTO user = UserContext.getUser();
String deviceCode = cmd.getDeviceCode(); String deviceCode = cmd.getDeviceCode();
Object value = redisTool.getValue(deviceCode); Object value = redisTool.getValue(deviceCode);
if (null == value) { if (null == value) return CommonResult.error(400, "设备不在线");
return CommonResult.error(400, "设备不在线");
}
DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class); DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class);
// 判断设备是否在线 // 判断设备是否在线
Boolean online = status.getOnline(); boolean online = status.isOnline();
if (!online) return CommonResult.error(400, "设备不在线"); if (!online) return CommonResult.error(400, "设备不在线");
boolean connected = mqttClient.isConnected();
if (!connected) throw new BizIllegalException("与mqtt服务器连接中断请稍后重试");
// 下发指令 // 下发指令
String ip = status.getIp(); mqttClient.publish(deviceCode + "/command", JSONUtil.toJsonStr(cmd));
CommonResult _result = commandUtil.sendCommand(cmd, ip);
// 记录指令下发事件 // 记录指令下发事件
RecycleDeviceEvent event = new RecycleDeviceEvent(); RecycleDeviceEvent event = new RecycleDeviceEvent();
event.setDeviceCode(deviceCode); event.setDeviceCode(deviceCode);
event.setBucketCode("");// event.setBucketCode("");//桶编码
event.setDoorNum(0);// if (StrUtil.isNotEmpty(cmd.getData())) {
event.setBucketCode(cmd.getData());//桶编码
}
event.setEventType(deviceCode); event.setEventType(deviceCode);
event.setEventDesc("设备命令下发"); event.setEventDesc("设备命令下发");
event.setEventTime(new Date()); event.setEventTime(new Date());
event.setIsLocal("0");//远程调用 event.setIsLocal("0");//远程调用
if (_result.getCode() != 200) {
event.setEventResult("1");
} else {
event.setEventResult("0");
}
UserDTO user = UserContext.getUser();
event.setCreateUserId(user.getId()); event.setCreateUserId(user.getId());
event.setCreateTime(new Date()); event.setCreateTime(new Date());
event.setCompanyId(user.getCompanyId()); event.setCompanyId(user.getCompanyId());
event.setOrganizeJsonId("");
eventService.save(event); eventService.save(event);
return _result; return CommonResult.success("指令下发成功");
} }
@ApiOperation("获取设备状态") @ApiOperation("获取设备状态")
@PostMapping("/getStatus") @PostMapping("/getStatus")
public CommonResult<String> getStatus(@RequestBody DeviceVO deviceVO) { public CommonResult<String> getStatus(@RequestBody DeviceVO deviceVO) {
Object value = redisTool.getValue(deviceVO.getDeviceCode()); Object value = redisTool.getValue(deviceVO.getDeviceCode());
if (value == null) { if (value == null) return CommonResult.error(400, "获取失败,设备不在线!");
return CommonResult.error(400, "获取失败,设备不在线!");
}
DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class); DeviceStatus status = JSONUtil.toBean(value.toString(), DeviceStatus.class);
if (status.isOnline()) return CommonResult.error(400, "获取失败,设备不在线!");
CommandVO command = new CommandVO(); CommandVO command = new CommandVO();
command.setCmd(CMDEnum.status); command.setCmd(CMDEnum.status);
command.setDeviceCode(deviceVO.getDeviceCode()); command.setDeviceCode(deviceVO.getDeviceCode());
command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); command.setOptTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
command.setData(""); command.setData("");
command.setRemark(""); command.setRemark("");
CommonResult commonResult = commandUtil.sendCommand(command, status.getIp()); if (!mqttClient.isConnected()) throw new BizIllegalException("与mqtt服务器连接中断请稍后重试");
Object data = commonResult.getData(); mqttClient.publish(deviceVO.getDeviceCode() + "/command", JSONUtil.toJsonStr(command));
DeviceStatus status1 = JSONUtil.toBean(data.toString(), DeviceStatus.class);
deviceService.updateDevice(status1);
status1.getBucketList().forEach(bucket -> {
bucketService.updateStatus(bucket);
});
return CommonResult.success("获取成功!"); return CommonResult.success("获取成功!");
} }
@ -609,7 +596,7 @@ public class DeviceController {
} }
} }
} }
redisTool.setValue(deviceCode, JSONUtil.toJsonStr(des), 1000 * 60 * 60l); redisTool.setValue(deviceCode, JSONUtil.toJsonStr(des), 1000 * 60 * 60L);
CommonResult<String> success = CommonResult.success(deviceCode); CommonResult<String> success = CommonResult.success(deviceCode);
success.setMsg("success"); success.setMsg("success");
return success; return success;
@ -632,7 +619,7 @@ public class DeviceController {
} }
} }
} }
redisTool.setValue(weight.getBucketCode(), JSONUtil.toJsonStr(weight), 1000 * 60l);//一分钟过期 redisTool.setValue(weight.getBucketCode(), JSONUtil.toJsonStr(weight), 1000 * 60L);//一分钟过期
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -7,12 +7,14 @@ import cc.yunxi.domain.vo.socket.MessageTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage; import cc.yunxi.domain.vo.socket.SocketMessage;
import cc.yunxi.domain.vo.vxmessage.*; import cc.yunxi.domain.vo.vxmessage.*;
import cc.yunxi.service.IWsService; import cc.yunxi.service.IWsService;
import cc.yunxi.utils.*; import cc.yunxi.utils.CustomerMqttClient;
import cc.yunxi.utils.UserContext;
import cc.yunxi.utils.VerifyUtil;
import cc.yunxi.utils.WeChatMessageUtil;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -30,8 +32,8 @@ public class WxMessageController {
@Resource @Resource
private VerifyUtil verifyUtil; private VerifyUtil verifyUtil;
// @Resource @Resource
// private CustomerMqttClient client; private CustomerMqttClient client;
/** /**
* *
@ -162,6 +164,9 @@ public class WxMessageController {
@PostMapping("/test") @PostMapping("/test")
public CommonResult<String> test() { public CommonResult<String> test() {
client.subscribe("testtopc/1",1);
client.publish("testtopc/1","{\"msg\": \"hello\"}");
client.unsubscribe("testtopc/1");
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -18,13 +18,13 @@ public class BucketStatus {
@NotNull(message = "门号不能为空") @NotNull(message = "门号不能为空")
private Integer doorNum; private Integer doorNum;
@ApiModelProperty(value = "投递门状态",required = true) @ApiModelProperty(value = "投递门状态",required = true)
private Boolean deliveryDoor; private boolean deliveryDoor;
@ApiModelProperty(value = "清运门状态",required = true) @ApiModelProperty(value = "清运门状态",required = true)
private Boolean cleanDoor; private boolean cleanDoor;
@ApiModelProperty(value = "烟雾报警", required = true, example = "true") @ApiModelProperty(value = "烟雾报警", required = true, example = "true")
private Boolean smogAlarm; private boolean smogAlarm;
@ApiModelProperty(value = "是否满",required = true) @ApiModelProperty(value = "是否满",required = true)
private Boolean isFull; private boolean isFull;
@ApiModelProperty(value = "重量",required = true) @ApiModelProperty(value = "重量",required = true)
private BigDecimal weight; private BigDecimal weight;
} }

@ -5,7 +5,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.List; import java.util.List;
@Data @Data
@ -15,7 +14,7 @@ public class DeviceStatus {
@NotBlank(message = "设备编码不能为空") @NotBlank(message = "设备编码不能为空")
private String deviceCode; private String deviceCode;
@ApiModelProperty(value = "是否在线", hidden = true) @ApiModelProperty(value = "是否在线", hidden = true)
private Boolean online; private boolean online;
@ApiModelProperty(value = "设备ip地址", hidden = true) @ApiModelProperty(value = "设备ip地址", hidden = true)
private String ip; private String ip;
@ApiModelProperty(value = "设备端口", hidden = true) @ApiModelProperty(value = "设备端口", hidden = true)

@ -1,24 +1,18 @@
package cc.yunxi.service.impl; package cc.yunxi.service.impl;
import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.common.exception.BizIllegalException; import cc.yunxi.common.exception.BizIllegalException;
import cc.yunxi.common.exception.BusinessLogicException; import cc.yunxi.common.exception.BusinessLogicException;
import cc.yunxi.common.exception.ForbiddenException;
import cc.yunxi.config.props.WxHsyProperties; import cc.yunxi.config.props.WxHsyProperties;
import cc.yunxi.config.props.WxShProperties; import cc.yunxi.config.props.WxShProperties;
import cc.yunxi.domain.dto.UserDTO; import cc.yunxi.domain.dto.UserDTO;
import cc.yunxi.domain.dto.WxLoginDTO; import cc.yunxi.domain.dto.WxLoginDTO;
import cc.yunxi.domain.dto.ZSGLoginDTO; import cc.yunxi.domain.dto.ZSGLoginDTO;
import cc.yunxi.domain.po.Client; import cc.yunxi.domain.po.Client;
import cc.yunxi.domain.po.Manager;
import cc.yunxi.domain.po.Recycler; import cc.yunxi.domain.po.Recycler;
import cc.yunxi.domain.query.RecyclerQuery;
import cc.yunxi.domain.vo.device.CMDEnum; import cc.yunxi.domain.vo.device.CMDEnum;
import cc.yunxi.domain.vo.device.CommandVO; import cc.yunxi.domain.vo.device.CommandVO;
import cc.yunxi.domain.vo.device.LoginRespVO; import cc.yunxi.domain.vo.device.LoginRespVO;
import cc.yunxi.enums.UserTypeEnum; import cc.yunxi.enums.UserTypeEnum;
import cc.yunxi.mapper.ManagerMapper;
import cc.yunxi.mapper.ProductMapper;
import cc.yunxi.service.IClientService; import cc.yunxi.service.IClientService;
import cc.yunxi.service.ICommonService; import cc.yunxi.service.ICommonService;
import cc.yunxi.service.IManagerService; import cc.yunxi.service.IManagerService;
@ -30,18 +24,13 @@ import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.swagger.annotations.ApiModelProperty;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* *
@ -71,7 +60,7 @@ public class CommonService implements ICommonService {
private WxShProperties wxShProperties; private WxShProperties wxShProperties;
@Resource @Resource
private CommandUtil commandUtil; private CustomerMqttClient mqttClient;
@Resource @Resource
private JwtTool jwtTool; private JwtTool jwtTool;
@ -179,8 +168,13 @@ public class CommonService implements ICommonService {
command.setData(data); command.setData(data);
command.setOptTime(DateUtil.now()); command.setOptTime(DateUtil.now());
command.setRemark("登录成功"); command.setRemark("登录成功");
CommonResult commonResult = commandUtil.sendCommand(command); boolean connected = mqttClient.isConnected();
log.info("设备登录结果 回收员 ===> code:{},msg:{}", commonResult.getCode(), commonResult.getMsg()); if (connected) {
mqttClient.publish(wxLoginDTO.getDevCode() + "command", JSONUtil.toJsonStr(command));
log.info("扫描登录 回收员 ===> 通知设备 code:{}", wxLoginDTO.getDevCode());
} else {
log.error("扫描登录失败 回收员 ===> mqtt未连接 code:{}", wxLoginDTO.getDevCode());
}
} }
return userDTO; return userDTO;
} }
@ -215,8 +209,13 @@ public class CommonService implements ICommonService {
command.setData(data); command.setData(data);
command.setOptTime(DateUtil.now()); command.setOptTime(DateUtil.now());
command.setRemark("登录成功"); command.setRemark("登录成功");
CommonResult commonResult = commandUtil.sendCommand(command); boolean connected = mqttClient.isConnected();
log.info("设备登录结果 散户 ===> code:{},msg:{}", commonResult.getCode(), commonResult.getMsg()); if (connected) {
mqttClient.publish(wxLoginDTO.getDevCode() + "command", JSONUtil.toJsonStr(command));
log.info("扫描登录 散户 ===> 通知设备 code:{}", wxLoginDTO.getDevCode());
} else {
log.error("扫描登录失败 散户 ===> mqtt未连接 code:{}", wxLoginDTO.getDevCode());
}
} }
return userDTO; return userDTO;

@ -3,7 +3,6 @@ package cc.yunxi.service.impl;
import cc.yunxi.domain.po.RecycleBucket; import cc.yunxi.domain.po.RecycleBucket;
import cc.yunxi.domain.vo.device.BucketStatus; import cc.yunxi.domain.vo.device.BucketStatus;
import cc.yunxi.mapper.RecycleBucketMapper; import cc.yunxi.mapper.RecycleBucketMapper;
import cc.yunxi.mapper.RecycleDeviceMapper;
import cc.yunxi.service.IRecycleBucketService; import cc.yunxi.service.IRecycleBucketService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
@ -38,10 +37,10 @@ public class RecycleBucketServiceImpl extends ServiceImpl<RecycleBucketMapper, R
UpdateWrapper<RecycleBucket> wrapper = new UpdateWrapper<>(); UpdateWrapper<RecycleBucket> wrapper = new UpdateWrapper<>();
wrapper.lambda() wrapper.lambda()
.set(RecycleBucket::getCurrentWeight, status.getWeight()) .set(RecycleBucket::getCurrentWeight, status.getWeight())
.set(RecycleBucket::getDeliveryDoorStatus, status.getDeliveryDoor() ? 1 : 0) .set(RecycleBucket::getDeliveryDoorStatus, status.isDeliveryDoor() ? 1 : 0)
.set(RecycleBucket::getCleanDoorStatus, status.getCleanDoor() ? 1 : 0) .set(RecycleBucket::getCleanDoorStatus, status.isCleanDoor() ? 1 : 0)
.set(RecycleBucket::getFullStatus, status.getIsFull() ? 1 : 0) .set(RecycleBucket::getFullStatus, status.isFull() ? 1 : 0)
.set(RecycleBucket::getSmogStatus, status.getSmogAlarm() ? 1 : 0) .set(RecycleBucket::getSmogStatus, status.isSmogAlarm() ? 1 : 0)
.eq(RecycleBucket::getBucketCode, status.getBucketCode()); .eq(RecycleBucket::getBucketCode, status.getBucketCode());
boolean update = this.update(wrapper); boolean update = this.update(wrapper);
return update ? 1 : 0; return update ? 1 : 0;
@ -52,19 +51,16 @@ public class RecycleBucketServiceImpl extends ServiceImpl<RecycleBucketMapper, R
if (old.getWeight().compareTo(now.getWeight()) != 0) { if (old.getWeight().compareTo(now.getWeight()) != 0) {
return true; return true;
} }
if (old.getIsFull() != now.getIsFull()) { if (old.isFull() != now.isFull()) {
return true; return true;
} }
if (old.getSmogAlarm() != now.getSmogAlarm()) { if (old.isSmogAlarm() != now.isSmogAlarm()) {
return true; return true;
} }
if (old.getCleanDoor() != now.getCleanDoor()) { if (old.isCleanDoor() != now.isCleanDoor()) {
return true; return true;
} }
if (old.getDeliveryDoor() != now.getDeliveryDoor()) { return old.isDeliveryDoor() != now.isDeliveryDoor();
return true;
}
return false;
} }
@Override @Override
public RecycleBucket getBagCode(String deviceCode, String bucketCode) { public RecycleBucket getBagCode(String deviceCode, String bucketCode) {

@ -37,7 +37,7 @@ public class RecycleDeviceServiceImpl extends ServiceImpl<RecycleDeviceMapper, R
UpdateWrapper<RecycleDevice> wrapper = new UpdateWrapper<>(); UpdateWrapper<RecycleDevice> wrapper = new UpdateWrapper<>();
wrapper.lambda() wrapper.lambda()
.set(RecycleDevice::getIpAddress, status.getIp()) .set(RecycleDevice::getIpAddress, status.getIp())
.set(RecycleDevice::getStatus, status.getOnline() ? 1 : 0) .set(RecycleDevice::getStatus, status.isOnline() ? 1 : 0)
.eq(RecycleDevice::getDeviceCode, status.getDeviceCode()); .eq(RecycleDevice::getDeviceCode, status.getDeviceCode());
return update(wrapper) ? 1 : 0; return update(wrapper) ? 1 : 0;
} }
@ -45,7 +45,6 @@ public class RecycleDeviceServiceImpl extends ServiceImpl<RecycleDeviceMapper, R
@Override @Override
public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) { public boolean isChanged(DeviceStatus oldStatus, DeviceStatus newStatus) {
if (!oldStatus.getIp().equals(newStatus.getIp())) return true; if (!oldStatus.getIp().equals(newStatus.getIp())) return true;
if (oldStatus.getOnline() != newStatus.getOnline()) return true; return oldStatus.isOnline() != newStatus.isOnline();
return false;
} }
} }

@ -1,43 +0,0 @@
package cc.yunxi.utils;
import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.domain.vo.device.CommandVO;
import cc.yunxi.domain.vo.device.DeviceStatus;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONUtil;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component
public class CommandUtil {
private static final int PORT = 5000;
@Resource
private RedisTool redisTool;
public CommonResult sendCommand(CommandVO command) {
DeviceStatus status = getDeviceStatus(command.getDeviceCode());
if (null == status) return CommonResult.error(400, "设备不在线!");
String ip = status.getIp();
String res = HttpUtil.post("http://" + ip + ":" + PORT + "/command", JSONUtil.toJsonStr(command), 1000);
return JSONUtil.toBean(res, CommonResult.class);
}
public CommonResult sendCommand(CommandVO command, String ip) {
String res = HttpUtil.post("http://" + ip + ":" + PORT + "/command", JSONUtil.toJsonStr(command), 1000);
return JSONUtil.toBean(res, CommonResult.class);
}
private DeviceStatus getDeviceStatus(String deviceCode) {
boolean exist = redisTool.existsKey(deviceCode);
long expire = redisTool.getKeyExpire(deviceCode, TimeUnit.SECONDS);
if (!exist || expire <= 0) {
return null;
}
Object value = redisTool.getValue(deviceCode);
if (value == null) return null;
return JSONUtil.toBean(value.toString(), DeviceStatus.class);
}
}

@ -1,11 +1,17 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import cc.yunxi.common.exception.BizIllegalException;
import cc.yunxi.config.props.MqttProperties; import cc.yunxi.config.props.MqttProperties;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
@ -23,17 +29,18 @@ public class CustomerMqttClient {
public void connect() throws MqttException { public void connect() throws MqttException {
client = new MqttClient(properties.getHost(), properties.getClientId(), new MemoryPersistence()); client = new MqttClient(properties.getHost(), properties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectionOptions options = new MqttConnectionOptions();
if (StrUtil.isNotEmpty(properties.getUserName())) { if (StrUtil.isNotEmpty(properties.getUsername())) {
options.setUserName(properties.getUserName()); options.setUserName(properties.getUsername());
} }
if (StrUtil.isNotEmpty(properties.getPassWord())) { if (StrUtil.isNotEmpty(properties.getPassword())) {
options.setPassword(properties.getPassWord().toCharArray()); options.setPassword(properties.getPassword().getBytes(StandardCharsets.UTF_8));
} }
options.setCleanSession(properties.getCleanSession()); options.setCleanStart(properties.getCleanSession());
options.setKeepAliveInterval(properties.getKeepAlive()); options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout()); options.setConnectionTimeout(properties.getTimeout());
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
options.setSessionExpiryInterval(0L);//会话过期时间
client.setCallback(new MyMqttCallback(client)); client.setCallback(new MyMqttCallback(client));
if (!client.isConnected()) { if (!client.isConnected()) {
client.connect(options); client.connect(options);
@ -50,6 +57,10 @@ public class CustomerMqttClient {
client.disconnect(); client.disconnect();
} }
public boolean isConnected() {
return client.isConnected();
}
public void publish(String topic, String msg) { public void publish(String topic, String msg) {
publish(topic, msg, properties.getQos(), false); publish(topic, msg, properties.getQos(), false);
} }
@ -63,7 +74,6 @@ public class CustomerMqttClient {
if (null == mqttTopic) { if (null == mqttTopic) {
log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos); log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
} }
boolean connected = client.isConnected();
try { try {
client.publish(topic, message); client.publish(topic, message);
@ -71,7 +81,7 @@ public class CustomerMqttClient {
} catch (MqttException e) { } catch (MqttException e) {
log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg); log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg);
e.printStackTrace(); e.printStackTrace();
throw new RuntimeException(e); throw new BizIllegalException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg);
} }
// MqttDeliveryToken token;//Delivery:配送 // MqttDeliveryToken token;//Delivery:配送
// synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充 // synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充

@ -1,12 +0,0 @@
package cc.yunxi.utils;
import org.eclipse.paho.client.mqttv3.MqttClient;
public class HeartbeatTask implements Runnable {
private MqttClient mqttClient;
@Override
public void run() {
// mqttClient.publish("heartbeat", "alive".getBytes());
}
}

@ -1,18 +1,15 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
//@Component //@Component
@Slf4j @Slf4j
public class MqttTool { public class MqttTool {
// @Resource // @Resource
private MqttClient client; private MqttClient client;
public boolean publish(String topic, MqttMessage message) { public boolean publish(String topic, MqttMessage message) {

@ -1,79 +1,50 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@Slf4j @Slf4j
public class MyMqttCallback implements MqttCallbackExtended { public class MyMqttCallback implements MqttCallback {
private MqttClient mqttClient; private MqttClient mqttClient;
private TopicHandler topicHandler;
public MyMqttCallback(MqttClient mqttClient) { public MyMqttCallback(MqttClient mqttClient) {
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
} }
//链接断开
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString());
mqttDisconnectResponse.getException().printStackTrace();
}
@Override @Override
public void connectionLost(Throwable throwable) { public void mqttErrorOccurred(MqttException e) {
log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage()); log.error("MQTT 错误,错误信息:{}", e.getMessage());
throwable.printStackTrace(); e.printStackTrace();
try {
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
// AtomicLong reconnectTimes = new AtomicLong(1);
// while (true) {
// try {
// if (mqttClient.getClient().isConnected()) {
// //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
// log.warn("mqtt 重新连接成功,重新开始初始化订阅主题...");
// return;
// }
// reconnectTimes.incrementAndGet();
// log.warn("mqtt 重连次数{}, 重新连接时间 {}", reconnectTimes, LocalDateTime.now());
// mqttClient.connect();
// } catch (MqttException e) {
// log.error("mqtt断连异常", e.getMessage());
// e.printStackTrace();
// }
// try {
// this.wait(5000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
} }
// 接受成功 //接收到消息
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) { public void messageArrived(String topic, MqttMessage mqttMessage) {
byte[] payload = mqttMessage.getPayload(); byte[] payload = mqttMessage.getPayload();
String message = new String(payload, StandardCharsets.UTF_8); String message = new String(payload, StandardCharsets.UTF_8);
log.info("接收到MQTT消息主题{},消息体:{}", topic, message); log.info("接收到MQTT消息主题{},消息体:{}", topic, message);
// topicHandler.handleTopic(topic, mqttMessage); topicHandler.handleTopic(topic, mqttMessage);
} }
//发送完成
@Override @Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { public void deliveryComplete(IMqttToken iMqttToken) {
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete()); int messageId = iMqttToken.getMessageId();
log.info("消息发送完成消息ID{}", iMqttDeliveryToken.getMessageId()); log.info("MQTT 消息发送完成消息ID{}", messageId);
try {
MqttMessage message = iMqttDeliveryToken.getMessage();
if(null != message){
byte[] payload = message.getPayload();
String content = new String(payload, StandardCharsets.UTF_8);
log.info("消息发送完成,返回消息内容:{}", content);
}
} catch (MqttException e) {
log.error("获取消息内容失败,原因:{}", e.getMessage());
e.printStackTrace();
}
} }
//连接完成后 //连接完成后
@ -81,4 +52,9 @@ public class MyMqttCallback implements MqttCallbackExtended {
public void connectComplete(boolean reconnect, String serverURI) { public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连"); log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连");
} }
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString());
}
} }

@ -1,6 +1,6 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component

@ -28,4 +28,14 @@ nxhs:
#测试环境为 develop 本地调试为 trial #测试环境为 develop 本地调试为 trial
env_version: "trial" env_version: "trial"
recycler: recycler:
env_version: "trial" env_version: "trial"
mqtt:
host: tcp://222.71.165.188:1883
clientId: nxhs-service
username: nxhs-service
password: 123456
cleanSession: false
timeout: 30
qos: 1
keepAlive: 30
topics: {}
Loading…
Cancel
Save