mqtt客户端 init

master
guochaojie 4 months ago
parent 13306a5cd6
commit 23df1d78b5

@ -251,4 +251,14 @@ mqtt:
authentication-method: password_based:built_in_database #用户认证方式
get-user-api: /api/v5/authentication/{id}/users/{user_id} #添加用户api
add-user-api: /api/v1/mqtt/add-user #添加用户api
delete-user-api: /api/v1/mqtt/add-user #添加用户api
delete-user-api: /api/v1/mqtt/add-user #添加用户api
client:
host: tcp://222.71.165.188:1883
clientId: pc-service
username: pc-service
password: 123456
cleanSession: false
timeout: 30
qos: 1
keepAlive: 30
topics:

@ -33,5 +33,12 @@
<artifactId>jnpf-common-all</artifactId>
<version>${project.version}</version>
</dependency>
<!-- 引入MQTT依赖 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
</project>

@ -1,37 +1,27 @@
package jnpf.controller;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jnpf.base.ActionResult;
import jnpf.base.UserInfo;
import jnpf.exception.DataException;
import jnpf.permission.entity.UserEntity;
import jnpf.service.*;
import jnpf.entity.*;
import jnpf.util.*;
import jnpf.base.vo.PageListVO;
import jnpf.base.vo.PaginationVO;
import jnpf.entity.RecycleCleanOrderEntity;
import jnpf.model.recyclecleanorder.*;
import jnpf.service.RecycleCleanOrderService;
import jnpf.util.GeneraterSwapUtil;
import jnpf.util.JsonUtil;
import jnpf.util.StringUtil;
import jnpf.util.UserProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.*;
import jnpf.annotation.JnpfField;
import jnpf.base.vo.PageListVO;
import jnpf.base.vo.PaginationVO;
import jnpf.base.vo.DownloadVO;
import jnpf.config.ConfigValueUtil;
import jnpf.base.entity.ProvinceEntity;
import java.io.IOException;
import java.util.stream.Collectors;
import jnpf.engine.entity.FlowTaskEntity;
import jnpf.exception.WorkFlowException;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*

@ -1,5 +1,6 @@
package jnpf.controller;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -10,6 +11,7 @@ import jnpf.entity.RecycleBucketEntity;
import jnpf.entity.RecycleCameraEntity;
import jnpf.entity.RecycleDeviceEntity;
import jnpf.entity.SimCardEntity;
import jnpf.model.recycledevice.Command;
import jnpf.model.recycledevice.RecycleDeviceConstant;
import jnpf.model.recycledevice.RecycleDeviceForm;
import jnpf.model.recycledevice.RecycleDevicePagination;
@ -21,8 +23,10 @@ import jnpf.util.GeneraterSwapUtil;
import jnpf.util.JsonUtil;
import jnpf.util.StringUtil;
import jnpf.util.UserProvider;
import jnpf.utils.CustomerMqttClient;
import lombok.extern.slf4j.Slf4j;
import oracle.jdbc.proxy.annotation.Post;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
@ -62,6 +66,8 @@ public class RecycleDeviceController {
private SimCardService simCardService;
@Resource
private RecycleCameraService cameraService;
@Resource
private CustomerMqttClient mqttClient;
/**
@ -241,7 +247,7 @@ public class RecycleDeviceController {
*/
@Operation(summary = "回收单关联设备")
@PostMapping("/queryRecycleDevie")
public ActionResult queryRecycleSaffDevie(@RequestBody RecycleDevicePagination recycleDevicePagination)throws IOException {
public ActionResult queryRecycleSaffDevie(@RequestBody RecycleDevicePagination recycleDevicePagination) throws IOException {
//查询凭证列表弹窗
// RecycleDevicePagination.setMenuId("530034857632334469");
// List<VoucherEntity> list= voucherService.queryInspectionVoucherPopupInfo(voucherPagination);
@ -258,4 +264,13 @@ public class RecycleDeviceController {
// List<RecycleDeviceEntity> recycleDeviceEntityList = recycleDeviceService.queryRecycleSaffDeviceList(stationId);
// return ActionResult.success(recycleDeviceEntityList);
// }
@Operation(summary = "远程发送命令")
@PostMapping("/command")
public ActionResult command(@RequestBody Command command) {
String deviceCode = command.getDeviceCode();
mqttClient.publish(deviceCode + "/command", JSONUtil.toJsonStr(command));
return ActionResult.success("命令已发送!");
}
}

@ -0,0 +1,152 @@
package jnpf.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Slf4j
@Component
public class CustomerMqttClient {
@Resource
private MyMqttCallback callback;
private MqttClient client;
@Value("${mqtt.client.host}")
private String host;
@Value("${mqtt.client.clientId}")
private String clientId;
@Value("${mqtt.client.username}")
private String userName;
@Value("${mqtt.client.password}")
private String password;
@Value("${mqtt.client.cleanSession}")
private boolean cleanSession;
@Value("${mqtt.client.timeout}")
private int timeout;
@Value("${mqtt.client.qos}")
private int qos;
@Value("${mqtt.client.keepAlive}")
private int keepAlive;
// @Value("${mqtt.client.topics}")
private Map<String, Integer> topics;
@PostConstruct
public void init() {
try {
connect();
} catch (MqttException e) {
log.error("连接MQTT服务失败", e);
throw new RuntimeException("连接MQTT服务失败");
}
}
public void connect() throws MqttException {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(userName);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(cleanSession);
options.setKeepAliveInterval(keepAlive);
options.setConnectionTimeout(timeout);
options.setAutomaticReconnect(true);
options.setAutomaticReconnectDelay(1, 3);
options.setSessionExpiryInterval(0L);//会话过期时间
client.setCallback(callback);
if (!client.isConnected()) {
client.connect(options);
log.info("连接MQTT服务成功 success");
} else {
client.disconnect();
client.connect(options);
log.info("重新连接MQTT服务成功 success");
}
callback.registerClient(client);
// initSubscribe();
}
public void disconnect() throws MqttException {
client.disconnect();
}
public boolean isConnected() {
return client.isConnected();
}
public void publish(String topic, String msg) {
publish(topic, msg, qos, false);
}
public void publish(String topic, String msg, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = client.getTopic(topic);
if (null == mqttTopic) {
log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
}
try {
client.publish(topic, message);
log.info("发布成功 topic: {} , qos: {} , message: {}", topic, qos, msg);
} catch (MqttException e) {
log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg);
e.printStackTrace();
throw new RuntimeException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg);
}
}
public void subscribe(String topicFilter, int qos) {
if (client != null && client.isConnected()) {
try {
client.subscribe(topicFilter, qos);
log.info("订阅主题成功,topic: {} , qos: {} ", topicFilter, qos);
} catch (MqttException e) {
log.error("订阅主题失败 topic:{}, qos: {} ", topicFilter, qos);
e.printStackTrace();
}
} else {
log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topicFilter, qos);
}
}
public void unsubscribe(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
log.info("取消订阅成功 topic success:{}", topic);
} catch (MqttException e) {
log.error("取消订阅失败 topic:{}", topic);
e.printStackTrace();
}
} else {
log.error("客户端连接状态已断开,取消订阅失败 topic:{}", topic);
}
}
public void initSubscribe() {
if (client != null && client.isConnected()) {
if (topics == null || topics.size() == 0) {
log.info("未配置初始化订阅主题!!!");
}
for (Map.Entry<String, Integer> entry : topics.entrySet()) {
String topic = entry.getKey();
int qos = entry.getValue();
log.info("初始化订阅主题 topic: {} , qos: {} ", topic, qos);
subscribe(topic, qos);
}
} else {
log.error("客户端未连接,初始化订阅失败!!!");
}
}
}

@ -0,0 +1,81 @@
package jnpf.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class MyMqttCallback implements MqttCallback {
private MqttClient mqttClient;
@Resource
private TopicHandler topicHandler;
public void registerClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString());
try {
mqttClient.reconnect();
} catch (MqttException e) {
log.error("MQTT 重新连接失败,错误信息:{}", e.getMessage());
throw new RuntimeException(e);
}
mqttDisconnectResponse.getException().printStackTrace();
}
@Override
public void mqttErrorOccurred(MqttException e) {
log.error("MQTT 错误,错误信息:{}", e.getMessage());
try {
mqttClient.reconnect();
} catch (MqttException ex) {
log.error("MQTT 重新连接失败,错误信息:{}", ex.getMessage());
throw new RuntimeException(ex);
}
e.printStackTrace();
}
//接收到消息
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
byte[] payload = mqttMessage.getPayload();
String message = new String(payload, StandardCharsets.UTF_8);
log.info("接收到MQTT消息主题{},消息体:{}", topic, message);
topicHandler.handleTopic(topic, mqttMessage);
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
int messageId = iMqttToken.getMessageId();
log.info("MQTT 消息发送完成消息ID{}", messageId);
}
//连接完成后
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连");
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString());
}
}

@ -0,0 +1,29 @@
package jnpf.utils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class TopicHandler {
public void handleTopic(String topic, MqttMessage message) {
String[] split = topic.split("/");
String deviceCode = split[0];
String action = split[1];
byte[] payload = message.getPayload();
String msg = new String(payload, StandardCharsets.UTF_8);
switch (action) {
case "Dead":
break;
default:
break;
}
log.info("消息已处理 topic:{},msg:{}", topic, msg);
}
}

@ -0,0 +1,29 @@
package jnpf.model.recycledevice;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum CMDEnum {
login("login", "登录成功"),
change("change", "配置更新"),
reboot("reboot", "重启设备"),
restart("restart", "重启应用"),
update("update", "更新应用"),
deliveryOpen("deliveryOpen", "投递开门"),
deliveryClose("deliveryClose", "投递关门"),
delay("delay", "投递延时"),
cleanOpen("cleanOpen", "清运开门"),
snapshot("snapshot", "抓拍"),
adjust("adjust", "远程校准"),
clear("clear", "远程清零"),
status("status", "获取状态");
@EnumValue
private final String value;
private final String desc;
}

@ -0,0 +1,14 @@
package jnpf.model.recycledevice;
import lombok.Data;
@Data
public class Command {
private String orderNo;
private String deviceCode;
private CMDEnum cmd;
private Object data;
private String optTime;
private String remark;
}
Loading…
Cancel
Save