瀏覽代碼

看课reids优化

xw 2 天之前
父節點
當前提交
a39372bbd9

+ 25 - 8
fs-common/src/main/java/com/fs/common/utils/redis/RedisActiveKeyIndexRepairService.java

@@ -71,15 +71,19 @@ public class RedisActiveKeyIndexRepairService {
         }
 
         int removed = 0;
-        Set<Object> members = redisCache.redisTemplate.opsForSet().members(target.indexKey);
-        if (members != null) {
-            for (Object member : members) {
-                String cacheKey = member.toString();
-                if (!Boolean.TRUE.equals(redisCache.redisTemplate.hasKey(cacheKey))) {
-                    redisCache.redisTemplate.opsForSet().remove(target.indexKey, cacheKey);
-                    removed++;
+        if (target.isRemoveExpiredFromIndex()) {
+            Set<Object> members = redisCache.redisTemplate.opsForSet().members(target.indexKey);
+            if (members != null) {
+                for (Object member : members) {
+                    String cacheKey = member.toString();
+                    if (!Boolean.TRUE.equals(redisCache.redisTemplate.hasKey(cacheKey))) {
+                        redisCache.redisTemplate.opsForSet().remove(target.indexKey, cacheKey);
+                        removed++;
+                    }
                 }
             }
+        } else {
+            log.info("索引 {} 跳过过期成员清理,交由 checkFsUserWatchStatus 写库后 untrack", target.name);
         }
         return new RepairResult(target.name, target.scanPattern, target.indexKey, scannedKeys.size(), added, removed);
     }
@@ -104,9 +108,11 @@ public class RedisActiveKeyIndexRepairService {
         targets.add(new IndexRepairTarget("liveUserWatchLog",
                 "live:user:watch:log:*",
                 LiveKeysConstant.LIVE_USER_WATCH_LOG_INDEX));
+        // 心跳索引只补不删:过期 key 由 checkFsUserWatchStatus 写 log_type=4 后再 untrack
         targets.add(new IndexRepairTarget("h5wxHeartbeat",
                 CourseWatchKeysConstant.H5_WX_HEARTBEAT_PREFIX + "*",
-                CourseWatchKeysConstant.H5_WX_HEARTBEAT_INDEX));
+                CourseWatchKeysConstant.H5_WX_HEARTBEAT_INDEX,
+                false));
         targets.add(new IndexRepairTarget("h5wxDuration",
                 CourseWatchKeysConstant.H5_WX_DURATION_PREFIX + "*",
                 CourseWatchKeysConstant.H5_WX_DURATION_INDEX));
@@ -144,11 +150,22 @@ public class RedisActiveKeyIndexRepairService {
         private final String name;
         private final String scanPattern;
         private final String indexKey;
+        /** false 时仅 SCAN 补登记,不清理索引中已 TTL 过期的成员 */
+        private final boolean removeExpiredFromIndex;
 
         public IndexRepairTarget(String name, String scanPattern, String indexKey) {
+            this(name, scanPattern, indexKey, true);
+        }
+
+        public IndexRepairTarget(String name, String scanPattern, String indexKey, boolean removeExpiredFromIndex) {
             this.name = name;
             this.scanPattern = scanPattern;
             this.indexKey = indexKey;
+            this.removeExpiredFromIndex = removeExpiredFromIndex;
+        }
+
+        public boolean isRemoveExpiredFromIndex() {
+            return removeExpiredFromIndex;
         }
     }
 

+ 49 - 26
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -543,37 +543,60 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
     @Override
     public void checkFsUserWatchStatus() {
         log.info("WXH5-开始更新会员看课中断记录>>>>>");
-        Set<String> keys = h5WxUserWatchRedisUtil.listHeartbeatKeys();
+        // 必须遍历索引全集(含已 TTL 过期的 key),否则 key 过期后只会从索引移除、不会写 log_type=4
+        Set<String> keys = h5WxUserWatchRedisUtil.listAllHeartbeatIndexMembers();
         LocalDateTime now = LocalDateTime.now();
         List<FsCourseWatchLog> logs = new ArrayList<>();
         for (String key : keys) {
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            String[] parts = key.split(":");
-            Long userId = Long.parseLong(parts[3]);
-            Long videoId = Long.parseLong(parts[4]);
-            Long companyUserId = Long.parseLong(parts[5]);
-            // 获取最后心跳时间
-            String lastHeartbeatStr = redisCache.getCacheObject(key);
-            if (lastHeartbeatStr == null) {
-                h5WxUserWatchRedisUtil.untrackHeartbeat(key);
-                continue; // 如果 Redis 中没有记录,跳过
-            }
-            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
-            Duration duration = Duration.between(lastHeartbeatTime, now);
+            try {
+                Long userId;
+                Long videoId;
+                Long companyUserId;
+                try {
+                    String[] parts = key.split(":");
+                    userId = Long.parseLong(parts[3]);
+                    videoId = Long.parseLong(parts[4]);
+                    companyUserId = Long.parseLong(parts[5]);
+                } catch (Exception e) {
+                    log.error("key中id为null:{}", key);
+                    h5WxUserWatchRedisUtil.untrackHeartbeat(key);
+                    continue;
+                }
 
-            watchLog.setVideoId(videoId);
-            watchLog.setUserId(userId);
-            watchLog.setCompanyUserId(companyUserId);
-            // 如果超过一分钟没有心跳,标记为“观看中断”
-            if (duration.getSeconds() >= 60) {
-                watchLog.setLogType(4);
-                // 从 Redis 中删除该记录
-                redisCache.deleteObject(key);
-                h5WxUserWatchRedisUtil.untrackHeartbeat(key);
-            } else {
-                watchLog.setLogType(1);
+                FsCourseWatchLog watchLog = new FsCourseWatchLog();
+                watchLog.setVideoId(videoId);
+                watchLog.setUserId(userId);
+                watchLog.setCompanyUserId(companyUserId);
+
+                if (!Boolean.TRUE.equals(redisCache.redisTemplate.hasKey(key))) {
+                    watchLog.setLogType(4);
+                    h5WxUserWatchRedisUtil.untrackHeartbeat(key);
+                    logs.add(watchLog);
+                    continue;
+                }
+
+                String lastHeartbeatStr = redisCache.getCacheObject(key);
+                if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
+                    watchLog.setLogType(4);
+                    redisCache.deleteObject(key);
+                    h5WxUserWatchRedisUtil.untrackHeartbeat(key);
+                    logs.add(watchLog);
+                    continue;
+                }
+
+                LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
+                Duration duration = Duration.between(lastHeartbeatTime, now);
+                if (duration.getSeconds() >= 60) {
+                    watchLog.setLogType(4);
+                    redisCache.deleteObject(key);
+                    h5WxUserWatchRedisUtil.untrackHeartbeat(key);
+                } else {
+                    watchLog.setLogType(1);
+                }
+                logs.add(watchLog);
+            } catch (Exception e) {
+                log.error("处理heartbeat key {} 时发生异常: {}", key, e.getMessage());
             }
-            logs.add(watchLog);
         }
         batchUpdateFsUserCourseWatchLog(logs, 100);
     }

+ 19 - 0
fs-service/src/main/java/com/fs/course/utils/H5WxUserWatchRedisUtil.java

@@ -51,6 +51,13 @@ public class H5WxUserWatchRedisUtil {
         return listIndexedKeys(HEARTBEAT_INDEX);
     }
 
+    /**
+     * 返回心跳索引中的全部成员(含 Redis 中已过期的 key),供中断检测写库 log_type=4。
+     */
+    public Set<String> listAllHeartbeatIndexMembers() {
+        return listAllIndexMembers(HEARTBEAT_INDEX);
+    }
+
     public void trackDuration(String cacheKey) {
         track(DURATION_INDEX, cacheKey);
     }
@@ -79,6 +86,18 @@ public class H5WxUserWatchRedisUtil {
         redisCache.redisTemplate.opsForSet().remove(indexKey, cacheKey);
     }
 
+    private Set<String> listAllIndexMembers(String indexKey) {
+        Set<Object> members = redisCache.redisTemplate.opsForSet().members(indexKey);
+        if (members == null || members.isEmpty()) {
+            return Collections.emptySet();
+        }
+        Set<String> result = new HashSet<>(members.size());
+        for (Object member : members) {
+            result.add(member.toString());
+        }
+        return result;
+    }
+
     private Set<String> listIndexedKeys(String indexKey) {
         Set<Object> members = redisCache.redisTemplate.opsForSet().members(indexKey);
         if (members == null || members.isEmpty()) {