Socket发送消息

master
guochaojie 5 months ago
parent e1d98ed98d
commit 58dc560a15

@ -25,18 +25,24 @@ import cc.yunxi.domain.vo.recycleorder.tmorder.TMRecycleOrderRespVO;
import cc.yunxi.domain.vo.recycleorderdetail.RecycleOrderDetailRespVO;
import cc.yunxi.domain.vo.recycler.RecyclerSimpleVO;
import cc.yunxi.domain.vo.recyclestation.RecycleStationSimpleVO;
import cc.yunxi.domain.vo.socket.AppTypeEnum;
import cc.yunxi.domain.vo.socket.MessageTypeEnum;
import cc.yunxi.domain.vo.socket.OrderNew;
import cc.yunxi.enums.OrderStatusEnum;
import cc.yunxi.enums.OrderTypeEnum;
import cc.yunxi.enums.UserTypeEnum;
import cc.yunxi.service.*;
import cc.yunxi.utils.UserContext;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.*;
@ -70,6 +76,8 @@ public class RecycleOrderController {
private final IHousingEstateService housingEstateService;
private final IWsService wsService;
@ApiOperation("分页查询回收订单")
@PostMapping("/page")
public CommonResult<PageDTO<? extends RecycleOrderRespVO>> queryOrderByPage(@RequestBody RecycleOrderQuery recycleOrderQuery) {
@ -102,6 +110,25 @@ public class RecycleOrderController {
UserDTO userDTO = UserContext.getUser();
orderCreateVO.setClientId(userDTO.getId());
String orderId = recycleOrderService.createOrder(orderCreateVO);
//todo 通知回收员
OrderNew message = new OrderNew();
message.setMessageType(MessageTypeEnum.BOOKING);//消息类型
message.setAppType(AppTypeEnum.recycler);//发送到哪里
message.setReceiveUserId(userDTO.getId());//接收人
message.setOrderNo("1223");//订单号
message.setOrderStatus(OrderStatusEnum.PENDING);
message.setReserveTime("");
message.setClientPhone("13183060802");
message.setGoodsType("1");
message.setClientAddress("这是地址");
message.setEstimatedWeight(10.2);
try {
wsService.sendMsgToUser(message);
} catch (IOException e) {
e.printStackTrace();
}
return CommonResult.success(orderId);
}
@ -173,6 +200,7 @@ public class RecycleOrderController {
UserDTO userDTO = UserContext.getUser();
orderCancelVO.setClientId(userDTO.getId());
recycleOrderService.cancelOrder(orderCancelVO);
//todo 通知回收员
return CommonResult.success(true);
}
@ -183,6 +211,7 @@ public class RecycleOrderController {
UserDTO userDTO = UserContext.getUser();
orderTakingVO.setStaffsId(userDTO.getId());
recycleOrderService.takingOrder(orderTakingVO);
//todo 通知用户
return CommonResult.success(true);
}
@ -194,6 +223,7 @@ public class RecycleOrderController {
UserDTO userDTO = UserContext.getUser();
orderReachVO.setStaffsId(userDTO.getId());
recycleOrderService.reachOrder(orderReachVO);
// todo 通知用户
return CommonResult.success(true);
}
@ -204,6 +234,7 @@ public class RecycleOrderController {
UserDTO userDTO = UserContext.getUser();
orderFinishVO.setStaffsId(userDTO.getId());
recycleOrderService.finishOrder(orderFinishVO);
//todo 通知用户
return CommonResult.success(true);
}
@ -227,7 +258,7 @@ public class RecycleOrderController {
* @return List<RecycleOrderRespVO>
*/
private List<? extends RecycleOrderRespVO> buildOrderAttributes(OrderTypeEnum orderType,
List<RecycleOrder> recycleOrderList, LocationDTO locationDTO) {
List<RecycleOrder> recycleOrderList, LocationDTO locationDTO) {
switch (orderType) {
case SH_ORDER:
List<SHRecycleOrderRespVO> shOrderRespVOList = BeanUtils.copyList(recycleOrderList, SHRecycleOrderRespVO.class, (s, t) -> {

@ -2,10 +2,12 @@ package cc.yunxi.controller;
import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.domain.vo.socket.AppTypeEnum;
import cc.yunxi.domain.vo.socket.MessageTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage;
import cc.yunxi.domain.vo.vxmessage.*;
import cc.yunxi.service.IWsService;
import cc.yunxi.utils.UserContext;
import cc.yunxi.utils.VerifyUtil;
import cc.yunxi.utils.WeChatMessageUtil;
import cc.yunxi.utils.WeChatUtil;
@ -125,21 +127,18 @@ public class WxMessageController {
@PostMapping("/getToken")
public CommonResult<AccessToken> getToken(@RequestBody String message) {
AccessToken client = weChatMessageUtil.getAccessToken("client");
AccessToken token = weChatMessageUtil.getAccessToken("client");
SocketMessage sms = new SocketMessage();
sms.setMessageType(MessageTypeEnum.BOOKING);
sms.setGoodsType("塑料");
sms.setOrderNo("123332");
sms.setOrderAddress("地址");
sms.setOrderPhone("电话");
sms.setOrderTime("2024-06-04 17:00~18:00");
sms.setAppType(AppTypeEnum.client);
sms.setReceiveUserId(UserContext.getUser().getId());
try {
wsService.sendMsgToUser("你好",sms);
wsService.sendMsgToUser(sms);
} catch (IOException e) {
log.error("发送消息失败");
e.printStackTrace();
}
return CommonResult.success(client);
return CommonResult.success(token);
}
@PostMapping("/send")

@ -0,0 +1,9 @@
package cc.yunxi.domain.vo.socket;
/**
*
*/
public enum AppTypeEnum {
client, //预约端
recycler //回收端
}

@ -0,0 +1,22 @@
package cc.yunxi.domain.vo.socket;
import cc.yunxi.enums.OrderStatusEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
@ApiModel("新订单")
public class OrderCancel extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
@ApiModelProperty("取消时间")
private String cancelOrderTime;
@ApiModelProperty("预约人号码")
private String orderPhone;
@ApiModelProperty("预约地址")
private String orderAddress;
}

@ -0,0 +1,35 @@
package cc.yunxi.domain.vo.socket;
import cc.yunxi.enums.OrderStatusEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ApiModel("新订单")
@Data
public class OrderFinish extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
@ApiModelProperty("商品类型")
private String goodsType;
@ApiModelProperty("预约时间")
private String orderTime;
@ApiModelProperty("预约人号码")
private String orderPhone;
@ApiModelProperty("预约地址")
private String orderAddress;
@ApiModelProperty("预计重量")
private String orderWeight;
@ApiModelProperty("接单人")
private String takeOrderUser;
@ApiModelProperty("接单时间")
private String takeOrderTime;
@ApiModelProperty("接单人电话")
private String takeUserPhone;
@ApiModelProperty("完成时间")
private String finishTime;
@ApiModelProperty("实际金额")
private String realMoney;
}

@ -0,0 +1,25 @@
package cc.yunxi.domain.vo.socket;
import cc.yunxi.enums.OrderStatusEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ApiModel("新订单")
@Data
public class OrderNew extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
@ApiModelProperty("商品类型")
private String goodsType;
@ApiModelProperty("预约时间")
private String reserveTime;
@ApiModelProperty("预约人号码")
private String clientPhone;
@ApiModelProperty("预约地址")
private String clientAddress;
@ApiModelProperty("预计重量")
private double estimatedWeight;
}

@ -0,0 +1,21 @@
package cc.yunxi.domain.vo.socket;
import cc.yunxi.enums.OrderStatusEnum;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ApiModel("新订单")
@Data
public class OrderTaken extends SocketMessage{
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private OrderStatusEnum orderStatus;
@ApiModelProperty("接单人")
private String takeOrderUser;
@ApiModelProperty("接单时间")
private String takeOrderTime;
@ApiModelProperty("接单人电话")
private String takeUserPhone;
}

@ -5,38 +5,26 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotBlank;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("socket消息体")
public class SocketMessage {
@ApiModelProperty("小程序类型")
@NotBlank(message = "小程序类型不能为空")
private AppTypeEnum appType;
@ApiModelProperty("消息类型")
@NotBlank(message = "消息类型不能为空")
private MessageTypeEnum messageType;
@ApiModelProperty("订单号")
private String orderNo;
@ApiModelProperty("订单状态")
private String orderStatus;
@ApiModelProperty("商品类型")
private String goodsType;
@ApiModelProperty("预约时间")
private String orderTime;
@ApiModelProperty("预约人号码")
private String orderPhone;
@ApiModelProperty("预约地址")
private String orderAddress;
@ApiModelProperty("预计重量")
private String orderWeight;
@ApiModelProperty("接单人")
private String takeOrderUser;
@ApiModelProperty("接单时间")
private String takeOrderTime;
@ApiModelProperty("接单人电话")
private String takeUserPhone;
@ApiModelProperty("完成时间")
private String finishTime;
@ApiModelProperty("实际金额")
private String realMoney;
@ApiModelProperty
@NotBlank(message = "接收人不能为空")
private String receiveUserId;
@ApiModelProperty
@NotBlank(message = "发送时间不能为空")
private LocalDateTime sendTime = LocalDateTime.now();
@ApiModelProperty("发送失败缓存时间(分钟)")
private int cacheMinute = 30;
}

@ -13,12 +13,11 @@ public interface IWsService {
/**
*
*
* @param userinfo
* @param message
* @return
* @throws IOException
*/
public void sendMsgToUser(String userinfo, SocketMessage message) throws IOException;
public void sendMsgToUser(SocketMessage message) throws IOException;
/**

@ -1,5 +1,7 @@
package cc.yunxi.service.impl;
import cc.yunxi.common.exception.BusinessLogicException;
import cc.yunxi.domain.vo.socket.AppTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage;
import cc.yunxi.service.IWsService;
import cc.yunxi.utils.WsSessionManager;
@ -21,11 +23,25 @@ public class WsServiceImpl implements IWsService {
private WsSessionManager sessionManager;
@Override
public void sendMsgToUser(String userinfo, SocketMessage message) throws IOException {
public void sendMsgToUser(SocketMessage message) throws IOException {
String text = JSONUtil.toJsonStr(message);
WebSocketSession userSession = sessionManager.getUserSession(userinfo);
AppTypeEnum appType = message.getAppType();
String userId = message.getReceiveUserId();
WebSocketSession userSession = null;
if (appType.equals(AppTypeEnum.client)) {
userSession = sessionManager.getClientSession(userId);
} else if (appType.equals(AppTypeEnum.recycler)) {
userSession = sessionManager.getClientSession(userId);
} else {
log.info("小程序未指定,不能发送!");
throw new BusinessLogicException("socket 消息发送失败appType is error");
}
if (userSession != null && userSession.isOpen()) {
userSession.sendMessage(new TextMessage(text));
} else {
// 用户不在线,添加到缓存队列
log.info("用户不在线,添加到缓存队列,上线后重发!");
sessionManager.addCacheMessage(message);
}
}

@ -1,11 +1,18 @@
package cc.yunxi.utils;
import cc.yunxi.domain.vo.socket.AppTypeEnum;
import cc.yunxi.domain.vo.socket.SocketMessage;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
@Component
@ -14,7 +21,12 @@ public class WsSessionManager {
* session
*/
private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, WebSocketSession> USER_SESSION_POOL = new ConcurrentHashMap<>();
//散户端session
private static ConcurrentHashMap<String, WebSocketSession> CLIENT_SESSION = new ConcurrentHashMap<>();
//回收端session
private static ConcurrentHashMap<String, WebSocketSession> RECYCLER_SESSION = new ConcurrentHashMap<>();
//消息缓存
private static final ConcurrentLinkedQueue<SocketMessage> MESSAGE_CACHE = new ConcurrentLinkedQueue<>();
/**
* session
@ -26,16 +38,31 @@ public class WsSessionManager {
SESSION_POOL.put(key, session);
}
public void addUserSession(String key, WebSocketSession session) {
public void addClientSession(String userId, WebSocketSession session) {
//获取旧session
WebSocketSession oldSession = USER_SESSION_POOL.get(key);
WebSocketSession oldSession = CLIENT_SESSION.get(userId);
if (null == oldSession //没有
|| !oldSession.getId().equals(session.getId()) //id不同
|| !oldSession.isOpen()) { //旧session已关闭
USER_SESSION_POOL.put(key, session); //则将新的session添加到池中
CLIENT_SESSION.put(userId, session); //则将新的session添加到池中
//判断缓存中是否有未发送的消息
sendCacheMessage(userId);
}
}
public void addRecyclerSession(String userId, WebSocketSession session) {
//获取旧session
WebSocketSession oldSession = RECYCLER_SESSION.get(userId);
if (null == oldSession //没有
|| !oldSession.getId().equals(session.getId()) //id不同
|| !oldSession.isOpen()) { //旧session已关闭
RECYCLER_SESSION.put(userId, session); //则将新的session添加到池中
//判断缓存中是否有未发送的消息
sendCacheMessage(userId);
}
}
/**
* session, session
*
@ -44,10 +71,54 @@ public class WsSessionManager {
*/
public WebSocketSession remove(String key) {
// 删除 session
USER_SESSION_POOL.remove(key);
CLIENT_SESSION.remove(key);
return SESSION_POOL.remove(key);
}
//处理未发送的消息
private void sendCacheMessage(String userId) {
// 遍历缓存队列
Iterator<SocketMessage> iterator = MESSAGE_CACHE.iterator();
if (iterator.hasNext()) {
SocketMessage message = iterator.next();
LocalDateTime sendTime = message.getSendTime();
int minute = message.getCacheMinute();
AppTypeEnum appType = message.getAppType();
WebSocketSession session = null;
switch (appType) {
case client:
session = CLIENT_SESSION.get(userId);
break;
case recycler:
session = RECYCLER_SESSION.get(userId);
default:
iterator.remove();//移除
break;
}
if (userId.equals(message.getReceiveUserId())
&& session != null
&& session.isOpen()
&& sendTime.plusMinutes(minute).isAfter(LocalDateTime.now())) {
// 发送消息
try {
session.sendMessage(new TextMessage(JSONUtil.toJsonStr(message)));
} catch (IOException e) {
e.printStackTrace();
}
iterator.remove();//移除
}
//过期的消息直接移除
if (sendTime.plusMinutes(minute).isBefore(LocalDateTime.now())) {
iterator.remove();
}
}
}
//添加未发送的消息
public void addCacheMessage(SocketMessage message) {
MESSAGE_CACHE.add(message);
}
/**
*
@ -68,14 +139,25 @@ public class WsSessionManager {
}
/**
* session
* session
*
* @param key
* @return
*/
public WebSocketSession getClientSession(String key) {
// 获得 session
return CLIENT_SESSION.get(key);
}
/**
* session
*
* @param key
* @return
*/
public WebSocketSession getUserSession(String key) {
public WebSocketSession getRecyclerSession(String key) {
// 获得 session
return USER_SESSION_POOL.get(key);
return RECYCLER_SESSION.get(key);
}
/**
@ -86,7 +168,7 @@ public class WsSessionManager {
*/
public WebSocketSession getSession(String key) {
// 获得 session
return USER_SESSION_POOL.get(key);
return CLIENT_SESSION.get(key);
}
//获取所有链接

@ -1,5 +1,6 @@
package cc.yunxi.utils;
import cc.yunxi.domain.vo.socket.AppTypeEnum;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
@ -37,8 +38,18 @@ public class WsSocketHandler extends AbstractWebSocketHandler {
log.info("server 接收到消息 " + payload);
JSONObject json = JSONUtil.parseObj(payload);
String userId = json.getStr("userId");
SESSION_MANAGER.addUserSession(userId, session);
session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
String appType = json.getStr("appType");
if (AppTypeEnum.client.name().equals(appType)) {
SESSION_MANAGER.addClientSession(userId, session);
session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else if (AppTypeEnum.recycler.name().equals(appType)) {
SESSION_MANAGER.addRecyclerSession(userId, session);
session.sendMessage(new TextMessage("server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()));
} else {
session.sendMessage(new TextMessage("server 发送给的消息 ws发送参数不正确连接即将关闭 发送时间:" + LocalDateTime.now().toString()));
SESSION_MANAGER.removeAndClose(session.getId());
log.error("ws发送参数不正确连接关闭");
}
}
@Override

Loading…
Cancel
Save