Ver Fonte

直播缓存用索引Set

xw há 2 dias atrás
pai
commit
7553a98586

+ 26 - 0
fs-common/src/main/java/com/fs/common/constant/LiveKeysConstant.java

@@ -16,6 +16,20 @@ public class LiveKeysConstant {
     public static final Integer LIVE_HOME_PAGE_LIST_EXPIRE = 300; //首页缓存过期时间
     public static final String LIVE_WATCH_USERS = "live:watch:users:%s"; //在线人数
     public static final String LIVE_COUPON_NUM = "live:coupon:num:%s"; //直播间优惠券数量
+    /** 优惠券余量缓存 key 索引(勿用 KEYS live:coupon:num:*) */
+    public static final String LIVE_COUPON_NUM_INDEX = "live:coupon:num:index";
+    public static final String LIVE_COUPON_NUM_PREFIX = "live:coupon:num:";
+
+    public static String liveCouponNumKey(Long couponIssueId) {
+        return String.format(LIVE_COUPON_NUM, couponIssueId);
+    }
+
+    public static Long parseCouponIssueIdFromKey(String key) {
+        if (key == null || !key.startsWith(LIVE_COUPON_NUM_PREFIX)) {
+            return null;
+        }
+        return Long.parseLong(key.substring(LIVE_COUPON_NUM_PREFIX.length()));
+    }
 
     public static final String LIVE_HOME_PAGE_DETAIL = "live:detail:%s"; //直播间详情
     public static final Integer LIVE_HOME_PAGE_DETAIL_EXPIRE = 300; //直播间详情过期时间
@@ -37,8 +51,20 @@ public class LiveKeysConstant {
     public static final Integer PRODUCT_DETAIL_CACHE_EXPIRE = 300; //商品详情缓存过期时间(秒)
 
     public static final String LIVE_TAG_MARK_CACHE = "live:tag:mark:%s"; //直播间打标签缓存,存储直播间ID、开始时间和视频时长
+    /** 打标签缓存 key 索引(勿用 KEYS live:tag:mark:*) */
+    public static final String LIVE_TAG_MARK_INDEX = "live:tag:mark:index";
+
+    public static String liveTagMarkKey(Long liveId) {
+        return String.format(LIVE_TAG_MARK_CACHE, liveId);
+    }
     //记录用户观看直播间信息 直播间id、用户id、外部联系人id、qwUserId
     public static final String LIVE_USER_WATCH_LOG_CACHE = "live:user:watch:log:%s:%s:%s:%s";
+    /** 用户观看日志活跃时间缓存索引(勿用 KEYS live:user:watch:log:*) */
+    public static final String LIVE_USER_WATCH_LOG_INDEX = "live:user:watch:log:index";
+
+    public static String liveUserWatchLogKey(Long liveId, Long userId, Long externalContactId, Long qwUserId) {
+        return String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId, externalContactId, qwUserId);
+    }
 
     /** 直播评论飘屏/置顶全局配置缓存(单条) */
     public static final String LIVE_COMMENT_FEATURE_CONFIG_ROW = "live:comment:feature:config:row";

+ 49 - 5
fs-common/src/main/java/com/fs/common/utils/redis/LiveDelayedTaskRedisUtil.java

@@ -10,7 +10,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * 直播延时任务 ZSet 索引:用 Set 维护活跃 ZSet key,避免生产环境 KEYS 全库扫描。
+ * 直播 Redis 活跃 key 索引:用 Set 维护待扫描的 ZSet / 缓存 key,避免生产环境 KEYS 全库扫描。
  */
 @Component
 public class LiveDelayedTaskRedisUtil {
@@ -67,15 +67,59 @@ public class LiveDelayedTaskRedisUtil {
     }
 
     public Set<String> listAutoTaskZSetKeys() {
-        return listZSetKeys(LiveKeysConstant.LIVE_AUTO_TASK_INDEX);
+        return listIndexedKeys(LiveKeysConstant.LIVE_AUTO_TASK_INDEX);
     }
 
     public Set<String> listLotteryTaskZSetKeys() {
-        return listZSetKeys(LiveKeysConstant.LIVE_LOTTERY_TASK_INDEX);
+        return listIndexedKeys(LiveKeysConstant.LIVE_LOTTERY_TASK_INDEX);
     }
 
     public Set<String> listRedTaskZSetKeys() {
-        return listZSetKeys(LiveKeysConstant.LIVE_RED_TASK_INDEX);
+        return listIndexedKeys(LiveKeysConstant.LIVE_RED_TASK_INDEX);
+    }
+
+    public String tagMarkKey(Long liveId) {
+        return LiveKeysConstant.liveTagMarkKey(liveId);
+    }
+
+    public void trackTagMark(Long liveId) {
+        track(LiveKeysConstant.LIVE_TAG_MARK_INDEX, tagMarkKey(liveId));
+    }
+
+    public void untrackTagMark(Long liveId) {
+        untrack(LiveKeysConstant.LIVE_TAG_MARK_INDEX, tagMarkKey(liveId));
+    }
+
+    public Set<String> listTagMarkKeys() {
+        return listIndexedKeys(LiveKeysConstant.LIVE_TAG_MARK_INDEX);
+    }
+
+    public String couponNumKey(Long couponIssueId) {
+        return LiveKeysConstant.liveCouponNumKey(couponIssueId);
+    }
+
+    public void trackCouponNum(Long couponIssueId) {
+        track(LiveKeysConstant.LIVE_COUPON_NUM_INDEX, couponNumKey(couponIssueId));
+    }
+
+    public void untrackCouponNum(Long couponIssueId) {
+        untrack(LiveKeysConstant.LIVE_COUPON_NUM_INDEX, couponNumKey(couponIssueId));
+    }
+
+    public Set<String> listCouponNumKeys() {
+        return listIndexedKeys(LiveKeysConstant.LIVE_COUPON_NUM_INDEX);
+    }
+
+    public void trackUserWatchLog(String cacheKey) {
+        track(LiveKeysConstant.LIVE_USER_WATCH_LOG_INDEX, cacheKey);
+    }
+
+    public void untrackUserWatchLog(String cacheKey) {
+        untrack(LiveKeysConstant.LIVE_USER_WATCH_LOG_INDEX, cacheKey);
+    }
+
+    public Set<String> listUserWatchLogKeys() {
+        return listIndexedKeys(LiveKeysConstant.LIVE_USER_WATCH_LOG_INDEX);
     }
 
     private void track(String indexKey, String zsetKey) {
@@ -100,7 +144,7 @@ public class LiveDelayedTaskRedisUtil {
         }
     }
 
-    private Set<String> listZSetKeys(String indexKey) {
+    private Set<String> listIndexedKeys(String indexKey) {
         Set<Object> members = redisCache.redisTemplate.opsForSet().members(indexKey);
         if (members == null || members.isEmpty()) {
             return Collections.emptySet();

+ 36 - 22
fs-live-app/src/main/java/com/fs/live/task/Task.java

@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.fs.common.constant.LiveKeysConstant.*;
-import static com.fs.common.constant.LiveKeysConstant.LIVE_COUPON_NUM;
 import static com.fs.live.websocket.service.WebSocketServer.USER_ENTRY_TIME_KEY;
 
 @Component
@@ -198,8 +197,9 @@ public class Task {
                         tagMarkInfo.put("startTime", live.getStartTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
                         tagMarkInfo.put("videoDuration", videoDuration);
 
-                        String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
+                        String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(live.getLiveId());
                         redisCache.setCacheObject(tagMarkKey, JSON.toJSONString(tagMarkInfo), 24, TimeUnit.HOURS);
+                        liveDelayedTaskRedisUtil.trackTagMark(live.getLiveId());
                         log.info("直播间开启,已加入打标签缓存: liveId={}, startTime={}, videoDuration={}",
                                 live.getLiveId(), live.getStartTime(), videoDuration);
                     }
@@ -228,8 +228,9 @@ public class Task {
 
                 // 删除打标签缓存
                 try {
-                    String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
+                    String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(live.getLiveId());
                     redisCache.deleteObject(tagMarkKey);
+                    liveDelayedTaskRedisUtil.untrackTagMark(live.getLiveId());
                     log.info("直播间结束,已删除打标签缓存: liveId={}", live.getLiveId());
                 } catch (Exception e) {
                     log.error("删除直播间打标签缓存失败: liveId={}, error={}", live.getLiveId(), e.getMessage(), e);
@@ -608,17 +609,23 @@ public class Task {
             }
             /*// 更新数据库
             liveDataService.updateLiveData(liveData);*/
-        Set<String> keys = redisCache.redisTemplate.keys(String.format(LIVE_COUPON_NUM, "*"));
-        if (keys != null && !keys.isEmpty()) {
-            for (String key : keys) {
-                Object o = redisCache.redisTemplate.opsForValue().get(String.format(LIVE_COUPON_NUM, key));
-                if (o != null) {
-                    LiveCouponIssue updateEntity = new LiveCouponIssue();
-                    updateEntity.setId(Long.valueOf(key));
-                    updateEntity.setRemainCount(Long.parseLong(o.toString()));
-                    liveCouponIssueService.updateLiveCouponIssue(updateEntity);
+        for (String key : liveDelayedTaskRedisUtil.listCouponNumKeys()) {
+            Object o = redisCache.getCacheObject(key);
+            if (o == null) {
+                Long issueId = LiveKeysConstant.parseCouponIssueIdFromKey(key);
+                if (issueId != null) {
+                    liveDelayedTaskRedisUtil.untrackCouponNum(issueId);
                 }
+                continue;
+            }
+            Long issueId = LiveKeysConstant.parseCouponIssueIdFromKey(key);
+            if (issueId == null) {
+                continue;
             }
+            LiveCouponIssue updateEntity = new LiveCouponIssue();
+            updateEntity.setId(issueId);
+            updateEntity.setRemainCount(Long.parseLong(o.toString()));
+            liveCouponIssueService.updateLiveCouponIssue(updateEntity);
         }
     }
 
@@ -640,11 +647,8 @@ public class Task {
     public void scanLiveTagMark() {
         try {
 
-            // 获取所有打标签缓存的key
-            String pattern = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, "*");
-            Set<String> keys = redisCache.redisTemplate.keys(pattern);
-
-            if (keys == null || keys.isEmpty()) {
+            Set<String> keys = liveDelayedTaskRedisUtil.listTagMarkKeys();
+            if (keys.isEmpty()) {
                 return;
             }
 
@@ -657,6 +661,10 @@ public class Task {
                     // 从Redis获取直播间信息
                     Object cacheValue = redisCache.getCacheObject(key);
                     if (cacheValue == null) {
+                        Long staleLiveId = LiveKeysConstant.parseLiveIdFromTaskZSetKey(key);
+                        if (staleLiveId != null) {
+                            liveDelayedTaskRedisUtil.untrackTagMark(staleLiveId);
+                        }
                         continue;
                     }
 
@@ -792,8 +800,9 @@ public class Task {
             // 删除已处理的直播间缓存
             for (Long liveId : processedLiveIds) {
                 try {
-                    String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, liveId);
+                    String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(liveId);
                     redisCache.deleteObject(tagMarkKey);
+                    liveDelayedTaskRedisUtil.untrackTagMark(liveId);
                 } catch (Exception e) {
                     log.error("删除直播间打标签缓存失败: liveId={}, error={}", liveId, e.getMessage(), e);
                 }
@@ -883,9 +892,10 @@ public class Task {
                                 continue;
                             }
                             //更新最新用户活跃时间
-                            String liveUserWatchLogKey = String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId,externalContactId,qwUserId);
+                            String liveUserWatchLogKey = LiveKeysConstant.liveUserWatchLogKey(liveId, userId, externalContactId, qwUserId);
                             LocalDateTime now = LocalDateTime.now();
-                            redisCache.setCacheObject(liveUserWatchLogKey,formatter.format(now),5,TimeUnit.MINUTES);
+                            redisCache.setCacheObject(liveUserWatchLogKey, formatter.format(now), 5, TimeUnit.MINUTES);
+                            liveDelayedTaskRedisUtil.trackUserWatchLog(liveUserWatchLogKey);
                             // 使用 updateLiveWatchLogTypeByDuration 的逻辑更新观看记录状态
                             updateLiveWatchLogTypeByDuration(liveId, userId, qwUserId, externalContactId,
                                     onlineSeconds, totalVideoDuration, updateLog);
@@ -986,14 +996,18 @@ public class Task {
     @DistributeLock(key = "updateLiveWatchUserStatus", scene = "task")
     public void updateLiveWatchUserStatus() {
         try {
-            Set<String> keys = redisCache.redisTemplate.keys("live:user:watch:log:*");
+            Set<String> keys = liveDelayedTaskRedisUtil.listUserWatchLogKeys();
             LocalDateTime now = LocalDateTime.now();
             DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
             List<LiveWatchLog> updateLog = new ArrayList<>();
-            if (keys != null && !keys.isEmpty()) {
+            if (!keys.isEmpty()) {
                 for (String key : keys) {
                     String[] split = key.split(":");
                     String cacheTime = redisCache.getCacheObject(key);
+                    if (StringUtils.isBlank(cacheTime)) {
+                        liveDelayedTaskRedisUtil.untrackUserWatchLog(key);
+                        continue;
+                    }
                     //判断缓存的值是否已经距离现在超过一分钟
                     if (StringUtils.isNotBlank(cacheTime)) {
                         try {

+ 12 - 6
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -735,16 +735,20 @@ public class WebSocketServer {
         msg.setStatus(status);
         Long couponIssueId = jsonObject.getLong("couponIssueId");
         // ①  检查  缓存是否存在  ② 如果是发布 放入缓存 ③ 删除缓存
+        String couponNumKey = LiveKeysConstant.liveCouponNumKey(couponIssueId);
+        LiveDelayedTaskRedisUtil delayedTaskRedisUtil = SpringUtils.getBean(LiveDelayedTaskRedisUtil.class);
         if (status == 1) {
-            Object cacheObject = redisCache.getCacheObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
+            Object cacheObject = redisCache.getCacheObject(couponNumKey);
             if (cacheObject == null) {
                 LiveCouponIssue liveCoupon = liveCouponIssueService.selectLiveCouponIssueById(couponIssueId);
                 if (liveCoupon != null) {
-                    redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId), liveCoupon.getRemainCount().intValue(), 30, TimeUnit.MINUTES);
+                    redisCache.setCacheObject(couponNumKey, liveCoupon.getRemainCount().intValue(), 30, TimeUnit.MINUTES);
+                    delayedTaskRedisUtil.trackCouponNum(couponIssueId);
                 }
             }
         } else {
-            redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
+            redisCache.deleteObject(couponNumKey);
+            delayedTaskRedisUtil.untrackCouponNum(couponIssueId);
         }
         // 管理员消息插队
         enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
@@ -1282,7 +1286,8 @@ public class WebSocketServer {
                 LiveCouponIssue liveCouponIssue = liveCouponIssueService.selectLiveCouponIssueByCouponId(liveCoupon.getCouponId());
                 LiveCouponIssueRelation relation = liveCouponMapper.selectCouponRelation(task.getLiveId(), liveCouponIssue.getId());
                 if (liveCoupon != null) {
-                    redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , liveCouponIssue.getId()), liveCouponIssue.getRemainCount().intValue(), 30, TimeUnit.MINUTES);
+                    redisCache.setCacheObject(LiveKeysConstant.liveCouponNumKey(liveCouponIssue.getId()), liveCouponIssue.getRemainCount().intValue(), 30, TimeUnit.MINUTES);
+                    SpringUtils.getBean(LiveDelayedTaskRedisUtil.class).trackCouponNum(liveCouponIssue.getId());
                 }
                 HashMap<String, Object> data = new HashMap<>();
                 data.put("liveId", task.getLiveId());
@@ -1422,9 +1427,10 @@ public class WebSocketServer {
                     if (log.getLogType() == null || log.getLogType() != 2) {
                         log.setLogType(1);
                         liveWatchLogService.updateLiveWatchLog(log);
-                        String liveUserWatchLogKey = String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId,externalContactId,qwUserId);
+                        String liveUserWatchLogKey = LiveKeysConstant.liveUserWatchLogKey(liveId, userId, externalContactId, qwUserId);
                         LocalDateTime now = LocalDateTime.now();
-                        redisCache.setCacheObject(liveUserWatchLogKey,formatter.format(now),5,TimeUnit.MINUTES);
+                        redisCache.setCacheObject(liveUserWatchLogKey, formatter.format(now), 5, TimeUnit.MINUTES);
+                        SpringUtils.getBean(LiveDelayedTaskRedisUtil.class).trackUserWatchLog(liveUserWatchLogKey);
                     }
                 }
             }

+ 9 - 2
fs-service/src/main/java/com/fs/live/service/impl/LiveCouponServiceImpl.java

@@ -10,6 +10,7 @@ import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
 import com.fs.common.utils.DateUtils;
 import com.fs.common.utils.StringUtils;
+import com.fs.common.utils.redis.LiveDelayedTaskRedisUtil;
 import com.fs.live.domain.*;
 import com.fs.live.mapper.LiveCouponIssueMapper;
 import com.fs.live.mapper.LiveCouponIssueUserMapper;
@@ -42,6 +43,8 @@ public class LiveCouponServiceImpl implements ILiveCouponService
     @Autowired
     private RedisCache redisCache;
     @Autowired
+    private LiveDelayedTaskRedisUtil liveDelayedTaskRedisUtil;
+    @Autowired
     private ILiveCouponIssueUserService liveCouponIssueUserService;
     @Autowired
     private ILiveCouponUserService liveCouponUserService;
@@ -316,12 +319,16 @@ public class LiveCouponServiceImpl implements ILiveCouponService
             }
         }
 
-        Long decrement = redisCache.decrement(String.format(LiveKeysConstant.LIVE_COUPON_NUM , coupon.getCouponIssueId()));
+        Long couponIssueId = coupon.getCouponIssueId();
+        String couponNumKey = LiveKeysConstant.liveCouponNumKey(couponIssueId);
+        Long decrement = redisCache.decrement(couponNumKey);
+        liveDelayedTaskRedisUtil.trackCouponNum(couponIssueId);
 
         if (decrement < 0L) {
             issue.setStatus(-1);
             issue.setRemainCount(0L);
-            redisCache.deleteObject(String.valueOf(issue.getId()));
+            redisCache.deleteObject(couponNumKey);
+            liveDelayedTaskRedisUtil.untrackCouponNum(couponIssueId);
             liveCouponIssueService.updateLiveCouponIssue(issue);
             return R.error("此优惠券已领完");
         }

+ 2 - 1
fs-service/src/main/java/com/fs/live/service/impl/LiveServiceImpl.java

@@ -1034,8 +1034,9 @@ public class LiveServiceImpl implements ILiveService
                 tagMarkInfo.put("startTime", exist.getStartTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
                 tagMarkInfo.put("videoDuration", videoDuration);
 
-                String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, exist.getLiveId());
+                String tagMarkKey = LiveKeysConstant.liveTagMarkKey(exist.getLiveId());
                 redisCache.setCacheObject(tagMarkKey, JSON.toJSONString(tagMarkInfo), 24, TimeUnit.HOURS);
+                liveDelayedTaskRedisUtil.trackTagMark(exist.getLiveId());
                 log.info("手动开直播间开启,已加入打标签缓存: liveId={}, startTime={}, videoDuration={}",
                         exist.getLiveId(), exist.getStartTime(), videoDuration);
             }