diff --git a/jnpf-java-boot/jnpf-admin/src/main/resources/application-dev.yml b/jnpf-java-boot/jnpf-admin/src/main/resources/application-dev.yml index 5b2a8ff..a2cde50 100644 --- a/jnpf-java-boot/jnpf-admin/src/main/resources/application-dev.yml +++ b/jnpf-java-boot/jnpf-admin/src/main/resources/application-dev.yml @@ -251,4 +251,14 @@ mqtt: authentication-method: password_based:built_in_database #用户认证方式 get-user-api: /api/v5/authentication/{id}/users/{user_id} #添加用户api add-user-api: /api/v1/mqtt/add-user #添加用户api - delete-user-api: /api/v1/mqtt/add-user #添加用户api \ No newline at end of file + delete-user-api: /api/v1/mqtt/add-user #添加用户api + client: + host: tcp://222.71.165.188:1883 + clientId: pc-service + username: pc-service + password: 123456 + cleanSession: false + timeout: 30 + qos: 1 + keepAlive: 30 + topics: diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/pom.xml b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/pom.xml index 92235bf..f46d2d4 100644 --- a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/pom.xml +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/pom.xml @@ -33,5 +33,12 @@ jnpf-common-all ${project.version} + + + + org.eclipse.paho + org.eclipse.paho.mqttv5.client + 1.2.5 + \ No newline at end of file diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleCleanOrderController.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleCleanOrderController.java index f3c2c12..a7b52f9 100644 --- a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleCleanOrderController.java +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleCleanOrderController.java @@ -1,37 +1,27 @@ package jnpf.controller; -import cn.hutool.core.util.ObjectUtil; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jnpf.base.ActionResult; -import jnpf.base.UserInfo; -import jnpf.exception.DataException; -import jnpf.permission.entity.UserEntity; -import jnpf.service.*; -import jnpf.entity.*; -import jnpf.util.*; +import jnpf.base.vo.PageListVO; +import jnpf.base.vo.PaginationVO; +import jnpf.entity.RecycleCleanOrderEntity; import jnpf.model.recyclecleanorder.*; +import jnpf.service.RecycleCleanOrderService; +import jnpf.util.GeneraterSwapUtil; +import jnpf.util.JsonUtil; +import jnpf.util.StringUtil; +import jnpf.util.UserProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import javax.validation.Valid; -import java.util.*; - -import jnpf.annotation.JnpfField; -import jnpf.base.vo.PageListVO; -import jnpf.base.vo.PaginationVO; -import jnpf.base.vo.DownloadVO; -import jnpf.config.ConfigValueUtil; -import jnpf.base.entity.ProvinceEntity; - import java.io.IOException; -import java.util.stream.Collectors; - -import jnpf.engine.entity.FlowTaskEntity; -import jnpf.exception.WorkFlowException; -import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * 清运清单 diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleDeviceController.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleDeviceController.java index d0015da..493ba31 100644 --- a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleDeviceController.java +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/controller/RecycleDeviceController.java @@ -1,5 +1,6 @@ package jnpf.controller; +import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -10,6 +11,7 @@ import jnpf.entity.RecycleBucketEntity; import jnpf.entity.RecycleCameraEntity; import jnpf.entity.RecycleDeviceEntity; import jnpf.entity.SimCardEntity; +import jnpf.model.recycledevice.Command; import jnpf.model.recycledevice.RecycleDeviceConstant; import jnpf.model.recycledevice.RecycleDeviceForm; import jnpf.model.recycledevice.RecycleDevicePagination; @@ -21,8 +23,10 @@ import jnpf.util.GeneraterSwapUtil; import jnpf.util.JsonUtil; import jnpf.util.StringUtil; import jnpf.util.UserProvider; +import jnpf.utils.CustomerMqttClient; import lombok.extern.slf4j.Slf4j; import oracle.jdbc.proxy.annotation.Post; +import org.eclipse.paho.mqttv5.client.MqttClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; @@ -62,6 +66,8 @@ public class RecycleDeviceController { private SimCardService simCardService; @Resource private RecycleCameraService cameraService; + @Resource + private CustomerMqttClient mqttClient; /** @@ -241,7 +247,7 @@ public class RecycleDeviceController { */ @Operation(summary = "回收单关联设备") @PostMapping("/queryRecycleDevie") - public ActionResult queryRecycleSaffDevie(@RequestBody RecycleDevicePagination recycleDevicePagination)throws IOException { + public ActionResult queryRecycleSaffDevie(@RequestBody RecycleDevicePagination recycleDevicePagination) throws IOException { //查询凭证列表弹窗 // RecycleDevicePagination.setMenuId("530034857632334469"); // List list= voucherService.queryInspectionVoucherPopupInfo(voucherPagination); @@ -258,4 +264,13 @@ public class RecycleDeviceController { // List recycleDeviceEntityList = recycleDeviceService.queryRecycleSaffDeviceList(stationId); // return ActionResult.success(recycleDeviceEntityList); // } + @Operation(summary = "远程发送命令") + @PostMapping("/command") + public ActionResult command(@RequestBody Command command) { + String deviceCode = command.getDeviceCode(); + mqttClient.publish(deviceCode + "/command", JSONUtil.toJsonStr(command)); + return ActionResult.success("命令已发送!"); + } + + } diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/CustomerMqttClient.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/CustomerMqttClient.java new file mode 100644 index 0000000..ed5809a --- /dev/null +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/CustomerMqttClient.java @@ -0,0 +1,152 @@ +package jnpf.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttTopic; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +@Slf4j +@Component +public class CustomerMqttClient { + @Resource + private MyMqttCallback callback; + private MqttClient client; + + @Value("${mqtt.client.host}") + private String host; + @Value("${mqtt.client.clientId}") + private String clientId; + @Value("${mqtt.client.username}") + private String userName; + @Value("${mqtt.client.password}") + private String password; + @Value("${mqtt.client.cleanSession}") + private boolean cleanSession; + @Value("${mqtt.client.timeout}") + private int timeout; + @Value("${mqtt.client.qos}") + private int qos; + @Value("${mqtt.client.keepAlive}") + private int keepAlive; +// @Value("${mqtt.client.topics}") + private Map topics; + + @PostConstruct + public void init() { + try { + connect(); + } catch (MqttException e) { + log.error("连接MQTT服务失败", e); + throw new RuntimeException("连接MQTT服务失败"); + } + } + + public void connect() throws MqttException { + client = new MqttClient(host, clientId, new MemoryPersistence()); + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setUserName(userName); + options.setPassword(password.getBytes(StandardCharsets.UTF_8)); + options.setCleanStart(cleanSession); + options.setKeepAliveInterval(keepAlive); + options.setConnectionTimeout(timeout); + options.setAutomaticReconnect(true); + options.setAutomaticReconnectDelay(1, 3); + options.setSessionExpiryInterval(0L);//会话过期时间 + client.setCallback(callback); + if (!client.isConnected()) { + client.connect(options); + log.info("连接MQTT服务成功 success"); + } else { + client.disconnect(); + client.connect(options); + log.info("重新连接MQTT服务成功 success"); + } + callback.registerClient(client); +// initSubscribe(); + } + + public void disconnect() throws MqttException { + client.disconnect(); + } + + public boolean isConnected() { + return client.isConnected(); + } + + public void publish(String topic, String msg) { + publish(topic, msg, qos, false); + } + + public void publish(String topic, String msg, int qos, boolean retained) { + MqttMessage message = new MqttMessage(); + message.setPayload(msg.getBytes()); + message.setQos(qos); + message.setRetained(retained); + MqttTopic mqttTopic = client.getTopic(topic); + if (null == mqttTopic) { + log.error("发布失败,主题不存在!!topic: {} , qos: {} ", topic, qos); + } + try { + client.publish(topic, message); + log.info("发布成功 topic: {} , qos: {} , message: {}", topic, qos, msg); + } catch (MqttException e) { + log.info("发布失败 topic: {} , qos: {} , message: {}", topic, qos, msg); + e.printStackTrace(); + throw new RuntimeException("mqtt 发布主题失败,发布主题:" + topic + ",发布消息:" + msg); + } + } + + public void subscribe(String topicFilter, int qos) { + if (client != null && client.isConnected()) { + try { + client.subscribe(topicFilter, qos); + log.info("订阅主题成功,topic: {} , qos: {} ", topicFilter, qos); + } catch (MqttException e) { + log.error("订阅主题失败 topic:{}, qos: {} ", topicFilter, qos); + e.printStackTrace(); + } + } else { + log.error("客户端连接状态已断开,订阅主题失败 topic:{}, qos: {} ", topicFilter, qos); + } + } + + public void unsubscribe(String topic) { + if (client != null && client.isConnected()) { + try { + client.unsubscribe(topic); + log.info("取消订阅成功 topic success:{}", topic); + } catch (MqttException e) { + log.error("取消订阅失败 topic:{}", topic); + e.printStackTrace(); + } + } else { + log.error("客户端连接状态已断开,取消订阅失败 topic:{}", topic); + } + } + + public void initSubscribe() { + if (client != null && client.isConnected()) { + if (topics == null || topics.size() == 0) { + log.info("未配置初始化订阅主题!!!"); + } + for (Map.Entry entry : topics.entrySet()) { + String topic = entry.getKey(); + int qos = entry.getValue(); + log.info("初始化订阅主题 topic: {} , qos: {} ", topic, qos); + subscribe(topic, qos); + } + } else { + log.error("客户端未连接,初始化订阅失败!!!"); + } + } +} diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/MyMqttCallback.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/MyMqttCallback.java new file mode 100644 index 0000000..8b222ac --- /dev/null +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/MyMqttCallback.java @@ -0,0 +1,81 @@ +package jnpf.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.nio.charset.StandardCharsets; + +@Slf4j +@Component +public class MyMqttCallback implements MqttCallback { + + private MqttClient mqttClient; + @Resource + private TopicHandler topicHandler; + + public void registerClient(MqttClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + log.error("MQTT 断开连接,原因:{}", mqttDisconnectResponse.getReasonString()); + + try { + mqttClient.reconnect(); + } catch (MqttException e) { + log.error("MQTT 重新连接失败,错误信息:{}", e.getMessage()); + throw new RuntimeException(e); + } + mqttDisconnectResponse.getException().printStackTrace(); + } + + @Override + public void mqttErrorOccurred(MqttException e) { + log.error("MQTT 错误,错误信息:{}", e.getMessage()); + try { + mqttClient.reconnect(); + } catch (MqttException ex) { + log.error("MQTT 重新连接失败,错误信息:{}", ex.getMessage()); + throw new RuntimeException(ex); + } + e.printStackTrace(); + } + + //接收到消息 + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + byte[] payload = mqttMessage.getPayload(); + String message = new String(payload, StandardCharsets.UTF_8); + log.info("接收到MQTT消息,主题:{},消息体:{}", topic, message); + topicHandler.handleTopic(topic, mqttMessage); + } + + @Override + public void deliveryComplete(IMqttToken iMqttToken) { + int messageId = iMqttToken.getMessageId(); + log.info("MQTT 消息发送完成,消息ID:{}", messageId); + } + + //连接完成后 + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("MQTT 连接成功,连接方式:{}", reconnect ? "重连" : "直连"); + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + log.info("MQTT 认证包到达,原因码:{},原因:{}", i, mqttProperties.getReasonString()); + } + + + +} diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/TopicHandler.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/TopicHandler.java new file mode 100644 index 0000000..4aecbd1 --- /dev/null +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-controller/src/main/java/jnpf/utils/TopicHandler.java @@ -0,0 +1,29 @@ +package jnpf.utils; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +@Slf4j +public class TopicHandler { + + public void handleTopic(String topic, MqttMessage message) { + String[] split = topic.split("/"); + String deviceCode = split[0]; + String action = split[1]; + byte[] payload = message.getPayload(); + String msg = new String(payload, StandardCharsets.UTF_8); + switch (action) { + case "Dead": + + break; + default: + break; + } + log.info("消息已处理 :topic:{},msg:{}", topic, msg); + } + +} \ No newline at end of file diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/CMDEnum.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/CMDEnum.java new file mode 100644 index 0000000..f53af46 --- /dev/null +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/CMDEnum.java @@ -0,0 +1,29 @@ +package jnpf.model.recycledevice; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum CMDEnum { + + login("login", "登录成功"), + change("change", "配置更新"), + reboot("reboot", "重启设备"), + restart("restart", "重启应用"), + update("update", "更新应用"), + deliveryOpen("deliveryOpen", "投递开门"), + deliveryClose("deliveryClose", "投递关门"), + delay("delay", "投递延时"), + cleanOpen("cleanOpen", "清运开门"), + snapshot("snapshot", "抓拍"), + adjust("adjust", "远程校准"), + clear("clear", "远程清零"), + status("status", "获取状态"); + + @EnumValue + private final String value; + + private final String desc; +} diff --git a/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/Command.java b/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/Command.java new file mode 100644 index 0000000..5d772e9 --- /dev/null +++ b/jnpf-java-boot/jnpf-scm/jnpf-scm-entity/src/main/java/jnpf/model/recycledevice/Command.java @@ -0,0 +1,14 @@ +package jnpf.model.recycledevice; + +import lombok.Data; + +@Data +public class Command { + private String orderNo; + private String deviceCode; + private CMDEnum cmd; + private Object data; + private String optTime; + private String remark; + +}