吴树波 1 неделя назад
Родитель
Сommit
a699531750

+ 59 - 0
fs-live-app/src/main/java/com/fs/live/utils/WebSocketRateLimiter.java

@@ -0,0 +1,59 @@
+package com.fs.live.utils;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * WebSocket限流工具类
+ */
+public class WebSocketRateLimiter {
+
+    // 用户级别限流:每个用户每秒最多发送5条消息
+    private static final double USER_RATE_LIMIT = 5.0;
+    private static final ConcurrentHashMap<String, RateLimiter> userLimiters = new ConcurrentHashMap<>();
+
+    // 直播间级别限流:每个直播间每秒最多处理100条消息
+    private static final double ROOM_RATE_LIMIT = 100.0;
+    private static final ConcurrentHashMap<Long, RateLimiter> roomLimiters = new ConcurrentHashMap<>();
+
+    /**
+     * 检查用户是否被限流
+     * @param userId 用户ID
+     * @param liveId 直播间ID
+     * @return true-允许通过,false-被限流
+     */
+    public static boolean tryAcquire(Long userId, Long liveId) {
+        // 检查用户级别限流
+        String userKey = userId + "_" + liveId;
+        RateLimiter userLimiter = userLimiters.computeIfAbsent(
+                userKey,
+                k -> RateLimiter.create(USER_RATE_LIMIT)
+        );
+
+        if (!userLimiter.tryAcquire()) {
+            return false;
+        }
+
+        // 检查直播间级别限流
+        RateLimiter roomLimiter = roomLimiters.computeIfAbsent(
+                liveId,
+                k -> RateLimiter.create(ROOM_RATE_LIMIT)
+        );
+
+        return roomLimiter.tryAcquire();
+    }
+
+    /**
+     * 清理用户限流器
+     */
+    public static void removeUserLimiter(Long userId, Long liveId) {
+        userLimiters.remove(userId + "_" + liveId);
+    }
+
+    /**
+     * 清理直播间限流器
+     */
+    public static void removeRoomLimiter(Long liveId) {
+        roomLimiters.remove(liveId);
+    }
+}

+ 63 - 0
fs-live-app/src/main/java/com/fs/live/websocket/config/WebSocketSessionManager.java

@@ -0,0 +1,63 @@
+package com.fs.live.websocket.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.Session;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * 直播间WebSocket连接管理器
+ * 核心:按直播间ID分组存储Session,线程安全,低锁竞争
+ */
+@Slf4j
+@Component
+public class WebSocketSessionManager {
+
+    /**
+     * 存储结构:直播间ID -> 该直播间的所有连接Session
+     * CopyOnWriteArraySet:读多写少场景,线程安全且遍历无锁
+     */
+    private static final Map<String, Set<Session>> ROOM_SESSIONS = new ConcurrentHashMap<>();
+
+    /**
+     * 添加连接(绑定直播间)
+     */
+    public void addSession(String roomId, Session session) {
+        // 不存在则创建空集合(ConcurrentHashMap的原子操作)
+        ROOM_SESSIONS.computeIfAbsent(roomId, k -> new CopyOnWriteArraySet<>()).add(session);
+        log.info("直播间[{}]新增连接,当前连接数:{}", roomId, ROOM_SESSIONS.get(roomId).size());
+    }
+
+    /**
+     * 移除连接(清理无效连接)
+     */
+    public void removeSession(String roomId, Session session) {
+        if (ROOM_SESSIONS.containsKey(roomId)) {
+            Set<Session> sessions = ROOM_SESSIONS.get(roomId);
+            sessions.remove(session);
+            // 直播间无连接时清空,释放内存
+            if (sessions.isEmpty()) {
+                ROOM_SESSIONS.remove(roomId);
+            }
+            log.info("直播间[{}]移除连接,当前连接数:{}", roomId, sessions.size());
+        }
+    }
+
+    /**
+     * 获取直播间所有连接
+     */
+    public Set<Session> getRoomSessions(String roomId) {
+        return ROOM_SESSIONS.getOrDefault(roomId, new CopyOnWriteArraySet<>());
+    }
+
+    /**
+     * 获取所有直播间ID
+     */
+    public Set<String> getAllRoomIds() {
+        return ROOM_SESSIONS.keySet();
+    }
+}

+ 4 - 1
fs-service/src/main/java/com/fs/live/service/impl/LiveServiceImpl.java

@@ -11,6 +11,7 @@ import com.baomidou.mybatisplus.core.conditions.Wrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.fs.common.core.page.PageRequest;
+import com.fs.common.core.redis.service.StockDeductService;
 import com.fs.common.exception.base.BaseException;
 import com.fs.company.mapper.CompanyMapper;
 import com.fs.company.vo.CompanyVO;
@@ -109,6 +110,8 @@ public class LiveServiceImpl implements ILiveService
     @Autowired
     private LiveLotteryProductConfMapper liveLotteryProductConfMapper;
     @Autowired
+    private StockDeductService stockDeductService;
+    @Autowired
     private FsStoreProductScrmMapper fsStoreProductScrmMapper;
     @Autowired
     private RedisCache redisCache;
@@ -1292,7 +1295,7 @@ public class LiveServiceImpl implements ILiveService
                 newGoods.setStock(goodsMap.containsKey(liveGoods.getProductId())
                         ? goodsMap.get(liveGoods.getProductId()).getStock() : 0);
                 liveGoodsService.insertLiveGoods(newGoods);
-
+                stockDeductService.initStock(newGoods.getProductId(), newLiveId, newGoods.getStock().intValue());
                 goodsIdMapping.put(liveGoods.getGoodsId(), newGoods.getGoodsId());
 
                 // 复制商品推送任务(taskType=1)