Parcourir la source

ws 并发 锁,直播订单多sku

yuhongqi il y a 5 jours
Parent
commit
83a75db3b8

+ 159 - 19
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -31,10 +31,9 @@ import javax.websocket.server.ServerEndpoint;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static com.fs.common.constant.LiveKeysConstant.*;
 
@@ -50,6 +49,16 @@ public class WebSocketServer {
     private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
     // 管理端连接
     private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
+    
+    // Session发送锁,避免同一会话并发发送消息
+    private final static ConcurrentHashMap<String, Lock> sessionLocks = new ConcurrentHashMap<>();
+    // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
+    private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
+    // 心跳超时时间(毫秒):3分钟无心跳则认为超时
+    private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
+    // admin房间消息发送线程池(单线程,保证串行化)
+    private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
+    
     private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
     private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
     private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
@@ -178,7 +187,14 @@ public class WebSocketServer {
 
         } else {
             adminRoom.add(session);
+            // 为admin房间创建单线程执行器,保证串行化发送
+            adminExecutors.computeIfAbsent(liveId, k -> Executors.newSingleThreadExecutor());
         }
+        
+        // 初始化Session锁
+        sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
+        // 初始化心跳时间
+        heartbeatCache.put(session.getId(), System.currentTimeMillis());
 
         log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
     }
@@ -225,7 +241,19 @@ public class WebSocketServer {
             broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
         } else {
             adminRoom.remove(session);
+            // 如果admin房间为空,关闭并清理执行器
+            if (adminRoom.isEmpty()) {
+                ExecutorService executor = adminExecutors.remove(liveId);
+                if (executor != null) {
+                    executor.shutdown();
+                }
+                adminRooms.remove(liveId);
+            }
         }
+        
+        // 清理Session相关资源
+        heartbeatCache.remove(session.getId());
+        sessionLocks.remove(session.getId());
 
         log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
     }
@@ -244,6 +272,8 @@ public class WebSocketServer {
         try {
             switch (msg.getCmd()) {
                 case "heartbeat":
+                    // 更新心跳时间
+                    heartbeatCache.put(session.getId(), System.currentTimeMillis());
                     sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
                     break;
                 case "sendMsg":
@@ -499,13 +529,29 @@ public class WebSocketServer {
         return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
     }
 
-    //发送消息
+    //发送消息(带锁机制,避免并发发送)
     public void sendMessage(Session session, String message) throws IOException {
         if (session == null || !session.isOpen()) {
             log.warn("WebSocket 会话已关闭,跳过发送");
             return;
         }
-        session.getAsyncRemote().sendText(message);
+        
+        // 获取Session锁
+        Lock lock = sessionLocks.get(session.getId());
+        if (lock == null) {
+            // 如果锁不存在,创建一个新锁
+            lock = sessionLocks.computeIfAbsent(session.getId(), k -> new ReentrantLock());
+        }
+        
+        // 使用锁保证同一Session的消息串行发送
+        lock.lock();
+        try {
+            if (session.isOpen()) {
+                session.getAsyncRemote().sendText(message);
+            }
+        } finally {
+            lock.unlock();
+        }
     }
 
     public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
@@ -557,16 +603,33 @@ public class WebSocketServer {
         ConcurrentHashMap<Long, Session> room = getRoom(liveId);
         List<Session> adminRoom = getAdminRoom(liveId);
 
+        // 普通用户房间:并行发送
         room.forEach((k, v) -> {
             if (v.isOpen()) {
                 sendWithRetry(v,message,1);
             }
         });
-        adminRoom.forEach(v -> {
-            if (v.isOpen()) {
-                sendWithRetry(v,message,1);
+        
+        // admin房间:串行发送,使用单线程执行器
+        if (!adminRoom.isEmpty()) {
+            ExecutorService executor = adminExecutors.get(liveId);
+            if (executor != null && !executor.isShutdown()) {
+                executor.submit(() -> {
+                    for (Session session : adminRoom) {
+                        if (session.isOpen()) {
+                            sendWithRetry(session, message, 1);
+                        }
+                    }
+                });
+            } else {
+                // 如果执行器不存在或已关闭,直接发送
+                adminRoom.forEach(v -> {
+                    if (v.isOpen()) {
+                        sendWithRetry(v, message, 1);
+                    }
+                });
             }
-        });
+        }
     }
 
     public void removeLikeCountCache(Long liveId) {
@@ -626,6 +689,77 @@ public class WebSocketServer {
         }
     }
 
+    /**
+     * 定时清理无效会话(每分钟执行一次)
+     * 检查心跳超时的会话并关闭
+     */
+    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+    public void cleanInactiveSessions() {
+        long currentTime = System.currentTimeMillis();
+        int cleanedCount = 0;
+        
+        // 遍历所有直播间
+        for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
+            Long liveId = roomEntry.getKey();
+            ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
+            
+            // 检查普通用户会话
+            List<Long> toRemove = new ArrayList<>();
+            room.forEach((userId, session) -> {
+                Long lastHeartbeat = heartbeatCache.get(session.getId());
+                if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
+                    log.warn("会话心跳超时,即将关闭: sessionId={}, liveId={}, userId={}, 超时时长={}ms", 
+                            session.getId(), liveId, userId, currentTime - lastHeartbeat);
+                    toRemove.add(userId);
+                    try {
+                        if (session.isOpen()) {
+                            session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
+                        }
+                    } catch (Exception e) {
+                        log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}", 
+                                session.getId(), liveId, userId, e);
+                    }
+                }
+            });
+            
+            // 移除超时的会话
+            toRemove.forEach(room::remove);
+            cleanedCount += toRemove.size();
+        }
+        
+        // 检查admin房间
+        for (Map.Entry<Long, CopyOnWriteArrayList<Session>> adminEntry : adminRooms.entrySet()) {
+            Long liveId = adminEntry.getKey();
+            CopyOnWriteArrayList<Session> adminRoom = adminEntry.getValue();
+            
+            List<Session> toRemoveAdmin = new ArrayList<>();
+            for (Session session : adminRoom) {
+                Long lastHeartbeat = heartbeatCache.get(session.getId());
+                if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
+                    log.warn("admin会话心跳超时,即将关闭: sessionId={}, liveId={}, 超时时长={}ms", 
+                            session.getId(), liveId, currentTime - lastHeartbeat);
+                    toRemoveAdmin.add(session);
+                    try {
+                        if (session.isOpen()) {
+                            session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
+                        }
+                    } catch (Exception e) {
+                        log.error("关闭admin超时会话失败: sessionId={}, liveId={}", 
+                                session.getId(), liveId, e);
+                    }
+                }
+            }
+            
+            // 移除超时的admin会话
+            toRemoveAdmin.forEach(adminRoom::remove);
+            cleanedCount += toRemoveAdmin.size();
+        }
+        
+        if (cleanedCount > 0) {
+            log.info("清理无效会话完成,共清理 {} 个超时会话", cleanedCount);
+        }
+    }
+
 
     /**
      * 广播点赞消息
@@ -642,20 +776,21 @@ public class WebSocketServer {
     }
 
     private void sendWithRetry(Session session, String message, int maxRetries) {
+        if (session == null || !session.isOpen()) {
+            log.warn("WebSocket 会话已关闭,跳过发送");
+            return;
+        }
+        
         int attempts = 0;
         while (attempts < maxRetries) {
             try {
-                if (session == null || !session.isOpen()) {
-                    log.warn("WebSocket 会话已关闭,跳过发送");
-                    continue;
-                }
-                if(session.isOpen()) {
-                    session.getAsyncRemote().sendText(message);
-                }
+                // 使用带锁的sendMessage方法,避免并发发送
+                sendMessage(session, message);
                 return;  // 发送成功,退出
             } catch (Exception e) {
                 if (e.getMessage() != null && e.getMessage().contains("TEXT_FULL_WRITING")) {
                     attempts++;
+                    log.warn("发送消息遇到TEXT_FULL_WRITING错误,第{}次重试, sessionId={}", attempts, session.getId());
                     try {
                         TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(5, 100));
                     } catch (InterruptedException ie) {
@@ -663,11 +798,15 @@ public class WebSocketServer {
                         break;
                     }
                 } else {
-                    throw e;
+                    log.error("发送消息失败: sessionId={}, error={}", session.getId(), e.getMessage(), e);
+                    break;
                 }
             }
         }
-        log.info("超过重试次数, 消息 {}",message);
+        
+        if (attempts >= maxRetries) {
+            log.warn("超过重试次数({}),放弃发送消息: sessionId={}", maxRetries, session.getId());
+        }
     }
 
 
@@ -775,3 +914,4 @@ public class WebSocketServer {
     }
 
 }
+

+ 2 - 0
fs-service/src/main/java/com/fs/live/domain/LiveOrder.java

@@ -349,5 +349,7 @@ public class LiveOrder extends BaseEntity {
     private Integer cost;
     @TableField(exist = false)
     private String bankTransactionId;
+    @TableField(exist = false)
+    private Long attrValueId;
 
 }

+ 1 - 0
fs-service/src/main/java/com/fs/live/param/LiveOrderComputedParam.java

@@ -17,6 +17,7 @@ public class LiveOrderComputedParam implements Serializable
     private Long productId;
     private String totalNum;
     private Long cityId;
+    private Long attrValueId;
 
 
     @ApiModelProperty(value = "使用积分 1使用")

+ 83 - 7
fs-service/src/main/java/com/fs/live/service/impl/LiveOrderServiceImpl.java

@@ -230,6 +230,9 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
 
     @Autowired
     private FsStoreProductScrmMapper fsStoreProductBaseMapper;
+
+    @Autowired
+    private FsStoreProductAttrValueScrmMapper attrValueScrmMapper;
     @Autowired
     private ISysConfigService configService;
 
@@ -1208,6 +1211,19 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         for (LiveOrderItem vo : orderItemVOS) {
             if (vo.getIsAfterSales() == 1) {
                 fsStoreProductService.incProductStock(vo.getNum(), vo.getProductId(), vo.getProductAttrValueId());
+                LiveOrderItemDTO liveOrderItemDTO = JSONObject.parseObject(vo.getJsonInfo(), LiveOrderItemDTO.class);
+                // 获取商品信息
+                FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(vo.getProductId());
+                if (fsStoreProduct != null) {
+                    // 获取商品属性值中的条形码
+                    FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attr -> StringUtils.isNotEmpty(attr.getBarCode()) && attr.getBarCode().equals(liveOrderItemDTO.getBarCode())).findFirst().orElse(null);
+                    if (fsStoreProductAttrValue != null) {
+                        fsStoreProductAttrValue.setStock((int) (fsStoreProductAttrValue.getStock() + vo.getNum()));
+                        fsStoreProductAttrValue.setSales(fsStoreProductAttrValue.getSales() - 1);
+                        fsStoreProductAttrValueMapper.updateFsStoreProductAttrValue(fsStoreProductAttrValue);
+                    }
+
+                }
             }
         }
         if ("3".equals(order.getPayType())) {
@@ -1392,6 +1408,18 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         for (LiveOrderItem vo : orderItemVOS) {
             if (vo.getIsAfterSales() == 1) {
                 fsStoreProductService.incProductStock(vo.getNum(), vo.getProductId(), vo.getProductAttrValueId());
+                LiveOrderItemDTO liveOrderItemDTO = JSONObject.parseObject(vo.getJsonInfo(), LiveOrderItemDTO.class);
+                // 获取商品信息
+                FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(vo.getProductId());
+                if (fsStoreProduct != null) {
+                    // 获取商品属性值中的条形码
+                    FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attr -> StringUtils.isNotEmpty(attr.getBarCode()) && attr.getBarCode().equals(liveOrderItemDTO.getBarCode())).findFirst().orElse(null);
+                    if (fsStoreProductAttrValue != null) {
+                        fsStoreProductAttrValue.setStock((int) (fsStoreProductAttrValue.getStock() + vo.getNum()));
+                        fsStoreProductAttrValue.setSales(fsStoreProductAttrValue.getSales() - 1);
+                        fsStoreProductAttrValueMapper.updateFsStoreProductAttrValue(fsStoreProductAttrValue);
+                    }
+                }
             }
         }
         if ("3".equals(order.getPayType())) {
@@ -1822,10 +1850,23 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
             return null;
         }
         FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(param.getProductId());
-        if(fsStoreProduct == null) return null;
+        if (fsStoreProduct == null) {
+            log.error("商品不存在");
+            return null;
+        }
+        FsStoreProductAttrValueScrm fsStoreProductAttrValue = null;
+        if (!Objects.isNull(param.getAttrValueId())) {
+            fsStoreProductAttrValue = attrValueScrmMapper.selectFsStoreProductAttrValueById(param.getAttrValueId());
+        }
+
+        BigDecimal totalPrice = BigDecimal.ZERO;
+        if (fsStoreProductAttrValue != null) {
+            totalPrice = fsStoreProductAttrValue.getPrice().multiply(new BigDecimal(param.getTotalNum()));
+        } else {
+            totalPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(param.getTotalNum()));
+        }
         BigDecimal payPrice = BigDecimal.ZERO;
         BigDecimal payDelivery = BigDecimal.ZERO;
-        BigDecimal totalPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(param.getTotalNum()));
         if (param.getCityId() != null) {
             payDelivery = handleDeliveryMoney(param.getCityId(), fsStoreProduct, param.getTotalNum());
             totalPrice = totalPrice.add(payDelivery);
@@ -1954,11 +1995,24 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
     public LiveOrderComputeDTO computedOrder(long userId, LiveOrderComputedParam param) {
         String orderKey= redisCache.getCacheObject("orderKey:"+param.getOrderKey());
         if (StringUtils.isEmpty(orderKey)) {
+            log.error("订单已失效");
             return null;
         }
         FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(param.getProductId());
+        if (fsStoreProduct == null) {
+            log.error("商品不存在");
+            return null;
+        }
+        FsStoreProductAttrValueScrm fsStoreProductAttrValue = null;
+        if (!Objects.isNull(param.getAttrValueId())) {
+            fsStoreProductAttrValue = attrValueScrmMapper.selectFsStoreProductAttrValueById(param.getAttrValueId());
+        }
+
         BigDecimal totalPrice = BigDecimal.ZERO;
         BigDecimal payPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(param.getTotalNum()));
+        if (fsStoreProductAttrValue != null) {
+            payPrice = fsStoreProductAttrValue.getPrice().multiply(new BigDecimal(param.getTotalNum()));
+        }
         totalPrice = totalPrice.add(payPrice);
         BigDecimal payDelivery = BigDecimal.ZERO;
         BigDecimal deductionPrice = BigDecimal.ZERO;
@@ -3326,6 +3380,11 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         baseMapper.batchUpdateTime(list);
     }
 
+    @Override
+    public void batchUpdateTimeIds(List<Long> ids) {
+        baseMapper.batchUpdateTimeIds(ids);
+    }
+
 
     @Override
     @Transactional(rollbackFor = Throwable.class,propagation = Propagation.REQUIRED)
@@ -3356,10 +3415,20 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         if(goods.getStock() == null) return R.error("直播间商品库存不足");
         if(fsStoreProduct.getStock() < Integer.parseInt(liveOrder.getTotalNum()) || goods.getStock() < Integer.parseInt(liveOrder.getTotalNum())) return R.error("抱歉,这款商品已被抢光,暂时无库存~");
 
-        // 更改店铺库存
-        fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
-        fsStoreProduct.setSales(fsStoreProduct.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
-        fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
+        FsStoreProductAttrValueScrm attrValue = null;
+        if (!Objects.isNull(liveOrder.getAttrValueId())) {
+            attrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueById(liveOrder.getAttrValueId());
+        }
+        if (attrValue != null) {
+            attrValue.setStock(attrValue.getStock() - Integer.parseInt(liveOrder.getTotalNum()));
+            attrValue.setSales(attrValue.getSales() + Integer.parseInt(liveOrder.getTotalNum()));
+        } else {
+            // 更改店铺库存
+            fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
+            fsStoreProduct.setSales(fsStoreProduct.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
+            fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
+        }
+
         // 更新直播间库存
         goods.setStock(goods.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
         goods.setSales(goods.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
@@ -3382,6 +3451,9 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         log.info("订单生成:"+orderSn);
         liveOrder.setOrderCode(orderSn);
         BigDecimal payPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(liveOrder.getTotalNum()));
+        if (attrValue != null) {
+            payPrice = attrValue.getPrice().multiply(new BigDecimal(liveOrder.getTotalNum()));
+        }
         // 直播不需要服务费 0915 1735 左
 //        String config=configService.selectConfigByKey("store.config");
 //        StoreConfig storeConfig= JSONUtil.toBean(config,StoreConfig.class);
@@ -3430,7 +3502,11 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
                 LiveOrderItemDTO dto=new LiveOrderItemDTO();
                 dto.setImage(fsStoreProduct.getImage());
                 dto.setSku(String.valueOf(fsStoreProduct.getStock()));
-                FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attrValue -> StringUtils.isNotEmpty(attrValue.getBarCode())).findFirst().orElse(null);
+                FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attr -> StringUtils.isNotEmpty(attr.getBarCode())).findFirst().orElse(null);
+                if (fsStoreProductAttrValue != null) {
+                    dto.setBarCode(fsStoreProductAttrValue.getBarCode());
+                    dto.setGroupBarCode(fsStoreProductAttrValue.getGroupBarCode());
+                }
                 if (fsStoreProductAttrValue != null) {
                     dto.setBarCode(fsStoreProductAttrValue.getBarCode());
                     dto.setGroupBarCode(fsStoreProductAttrValue.getGroupBarCode());

+ 1 - 0
fs-service/src/main/java/com/fs/live/vo/LiveDataDetailVo.java

@@ -97,3 +97,4 @@ public class LiveDataDetailVo {
 
 
 
+

+ 1 - 0
fs-service/src/main/java/com/fs/live/vo/LiveUserDetailVo.java

@@ -39,3 +39,4 @@ public class LiveUserDetailVo {
 
 
 
+

+ 1 - 0
fs-service/src/main/java/com/fs/live/vo/ProductSalesVo.java

@@ -27,3 +27,4 @@ public class ProductSalesVo {
 
 
 
+

+ 8 - 0
fs-user-app/src/main/java/com/fs/app/controller/live/LiveOrderController.java

@@ -790,6 +790,14 @@ public class LiveOrderController extends AppBaseController
         }
 
     }
+    @Login
+    @ApiOperation("修改支付类型")
+    @PostMapping("/clearPayType")
+    @Transactional
+    public R clearPayType(HttpServletRequest request, @Validated @RequestBody FsStoreOrderPayParam param) {
+        redisCache.deleteObject("isPaying:"+param.getOrderId());
+        return R.ok();
+    }
 
 
 //    @Login