mqtt 取消

master
guochaojie 4 months ago
parent 4fd3bb2e03
commit 344776a768

@ -13,7 +13,7 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
//@Configuration //@Configuration
@EnableConfigurationProperties(MqttProperties.class) //@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig { public class MqttConfig {
@Resource @Resource

@ -6,7 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map; import java.util.Map;
@Data @Data
@ConfigurationProperties(prefix = "mqtt") //@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties { public class MqttProperties {
private String host; private String host;
private String clientId; private String clientId;

@ -23,8 +23,6 @@ import java.io.IOException;
@RequestMapping("/wx-message") @RequestMapping("/wx-message")
@Slf4j @Slf4j
public class WxMessageController { public class WxMessageController {
@Resource @Resource
private WeChatMessageUtil weChatMessageUtil; private WeChatMessageUtil weChatMessageUtil;
@Resource @Resource
@ -32,8 +30,8 @@ public class WxMessageController {
@Resource @Resource
private VerifyUtil verifyUtil; private VerifyUtil verifyUtil;
@Resource // @Resource
private CustomerMqttClient client; // private CustomerMqttClient client;
/** /**
* *
@ -164,31 +162,6 @@ public class WxMessageController {
@PostMapping("/test") @PostMapping("/test")
public CommonResult<String> test() { public CommonResult<String> test() {
client.publish("testtopic/1", "这是一个测试");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
client.publish("/testtopic/2", "这是一个测试");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
client.subscribe("/testtopic/#", 1);
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
client.unsubscribe("/testtopic/#");
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
return CommonResult.success("success"); return CommonResult.success("success");
} }

@ -34,9 +34,8 @@ public class CustomerMqttClient {
options.setKeepAliveInterval(properties.getKeepAlive()); options.setKeepAliveInterval(properties.getKeepAlive());
options.setConnectionTimeout(properties.getTimeout()); options.setConnectionTimeout(properties.getTimeout());
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
if (!client.isConnected()) {
client.setCallback(new MyMqttCallback(client)); client.setCallback(new MyMqttCallback(client));
if (!client.isConnected()) {
client.connect(options); client.connect(options);
log.info("连接MQTT服务成功 success"); log.info("连接MQTT服务成功 success");
} else { } else {
@ -51,11 +50,11 @@ public class CustomerMqttClient {
client.disconnect(); client.disconnect();
} }
public void publish(String msg, String topic) { public void publish(String topic, String msg) {
publish(msg, topic, properties.getQos(), false); publish(topic, msg, properties.getQos(), false);
} }
public void publish(String msg, String topic, int qos, boolean retained) { public void publish(String topic, String msg, int qos, boolean retained) {
MqttMessage message = new MqttMessage(); MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes()); message.setPayload(msg.getBytes());
message.setQos(qos); message.setQos(qos);
@ -64,6 +63,7 @@ public class CustomerMqttClient {
if (null == mqttTopic) { if (null == mqttTopic) {
log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos); log.error("发布失败主题不存在topic: {} , qos: {} ", topic, qos);
} }
boolean connected = client.isConnected();
try { try {
client.publish(topic, message); client.publish(topic, message);

@ -1,11 +1,9 @@
package cc.yunxi.utils; package cc.yunxi.utils;
import cc.yunxi.domain.po.Client;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.*;
import java.time.LocalDateTime; import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public class MyMqttCallback implements MqttCallbackExtended { public class MyMqttCallback implements MqttCallbackExtended {
@ -20,9 +18,11 @@ public class MyMqttCallback implements MqttCallbackExtended {
@Override @Override
public void connectionLost(Throwable throwable) { public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage()); log.error("mqtt connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage());
throwable.printStackTrace();
try { try {
mqttClient.reconnect(); mqttClient.reconnect();
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// AtomicLong reconnectTimes = new AtomicLong(1); // AtomicLong reconnectTimes = new AtomicLong(1);
@ -51,7 +51,9 @@ public class MyMqttCallback implements MqttCallbackExtended {
// 接受成功 // 接受成功
@Override @Override
public void messageArrived(String topic, MqttMessage mqttMessage) { public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("接收到MQTT消息主题{},消息体:{}", topic, mqttMessage.getPayload().toString()); byte[] payload = mqttMessage.getPayload();
String message = new String(payload, StandardCharsets.UTF_8);
log.info("接收到MQTT消息主题{},消息体:{}", topic, message);
// topicHandler.handleTopic(topic, mqttMessage); // topicHandler.handleTopic(topic, mqttMessage);
} }
@ -61,7 +63,12 @@ public class MyMqttCallback implements MqttCallbackExtended {
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete()); log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
log.info("消息发送完成消息ID{}", iMqttDeliveryToken.getMessageId()); log.info("消息发送完成消息ID{}", iMqttDeliveryToken.getMessageId());
try { try {
log.info("消息发送完成,消息内容:{}", iMqttDeliveryToken.getMessage().getPayload().toString()); MqttMessage message = iMqttDeliveryToken.getMessage();
if(null != message){
byte[] payload = message.getPayload();
String content = new String(payload, StandardCharsets.UTF_8);
log.info("消息发送完成,返回消息内容:{}", content);
}
} catch (MqttException e) { } catch (MqttException e) {
log.error("获取消息内容失败,原因:{}", e.getMessage()); log.error("获取消息内容失败,原因:{}", e.getMessage());
e.printStackTrace(); e.printStackTrace();

Loading…
Cancel
Save