Преглед изворни кода

update qw_task定时任务切换租户对应redis

ct пре 1 недеља
родитељ
комит
c45db70c55

+ 0 - 10
fs-qw-task/src/main/java/com/fs/app/task/CourseWatchLogScheduler.java

@@ -1,7 +1,6 @@
 package com.fs.app.task;
 
 import com.fs.app.taskService.SopLogsTaskService;
-import com.fs.common.core.redis.RedisCache;
 import com.fs.course.mapper.FsCourseWatchLogMapper;
 import com.fs.course.mapper.FsUserCourseVideoMapper;
 import com.fs.course.service.IFsCourseLinkService;
@@ -41,15 +40,6 @@ public class CourseWatchLogScheduler {
     @Autowired
     private QwSopLogsMapper qwSopLogsMapper;
 
-    @Autowired
-    RedisCache redisCache;
-
-    @Autowired
-    private FsUserCourseVideoMapper courseVideoMapper;
-
-    @Autowired
-    private ISysConfigService configService;
-
     @Autowired
     private IFsCourseWatchLogService courseWatchLogService;
 

+ 11 - 2
fs-qw-task/src/main/java/com/fs/app/task/TenantTaskRunner.java

@@ -10,10 +10,13 @@ import com.fs.framework.datasource.DynamicDataSourceContextHolder;
 import com.fs.framework.datasource.TenantDataSourceManager;
 import com.fs.system.domain.SysConfig;
 import com.fs.system.mapper.SysConfigMapper;
+import com.fs.framework.config.TenantPrincipal;
 import com.fs.tenant.domain.TenantInfo;
 import com.fs.tenant.service.TenantInfoService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -173,8 +176,13 @@ public class TenantTaskRunner {
         try {
             // 切换到租户数据源
             tenantDataSourceManager.switchTenant(tenant);
-            // 切换Redis租户上下文
+            // 切换 Redis 租户上下文(与 user-app 写入 Redis 时 TenantKeyRedisSerializer 行为一致)
             RedisTenantContext.setTenantId(tenant.getId());
+            SecurityContextHolder.getContext().setAuthentication(
+                    new UsernamePasswordAuthenticationToken(
+                            new TenantPrincipal(tenant.getId()),
+                            null,
+                            java.util.Collections.emptyList()));
             log.info("[SaaS Task] 定时任务切换数据源和Redis dataSource={}, tenantId={}, tenantCode={}, task={}",
                     dataSourceKey, tenant.getId(), tenant.getTenantCode(), taskName != null ? taskName : "");
 
@@ -196,8 +204,9 @@ public class TenantTaskRunner {
         } finally {
             ProjectConfig.clearTenantConfigs();
             TenantConfigContext.clear();
-            // 清理Redis租户上下文
+            // 清理 Redis / Security 租户上下文
             RedisTenantContext.clear();
+            SecurityContextHolder.clearContext();
             DynamicDataSourceContextHolder.clearDataSourceType();
         }
     }

+ 40 - 0
fs-qw-task/src/main/java/com/fs/framework/datasource/TenantDataSourceManager.java

@@ -1,7 +1,11 @@
 package com.fs.framework.datasource;
 
 import com.alibaba.druid.pool.DruidDataSource;
+import com.fs.common.enums.DataSourceType;
 import com.fs.tenant.domain.TenantInfo;
+import com.fs.tenant.service.TenantInfoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -16,9 +20,14 @@ import java.util.concurrent.ConcurrentHashMap;
 @Component
 public class TenantDataSourceManager {
 
+    private static final Logger log = LoggerFactory.getLogger(TenantDataSourceManager.class);
+
     @Resource
     private DynamicDataSource dynamicDataSource;
 
+    @Resource
+    private TenantInfoService tenantInfoService;
+
     private static final Map<String, DataSource> TENANT_DS_CACHE = new ConcurrentHashMap<>();
 
     public void switchTenant(TenantInfo tenantInfo) {
@@ -44,6 +53,37 @@ public class TenantDataSourceManager {
         DynamicDataSourceContextHolder.clearDataSourceType();
     }
 
+    /**
+     * 根据租户 ID 确保数据源已注册并切换(线程池子线程等 ThreadLocal 未传递场景)。
+     */
+    public void ensureSwitchByTenantId(Long tenantId) {
+        if (tenantId == null) {
+            return;
+        }
+        String tenantKey = buildTenantKey(tenantId);
+        if (TENANT_DS_CACHE.containsKey(tenantKey)) {
+            DynamicDataSourceContextHolder.setDataSourceType(tenantKey);
+            return;
+        }
+        DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
+        try {
+            TenantInfo tenantInfo = tenantInfoService.getById(tenantId);
+            if (tenantInfo == null) {
+                log.warn("[TenantDS] 租户ID={} 不存在,回退主库", tenantId);
+                return;
+            }
+            if (!Integer.valueOf(1).equals(tenantInfo.getStatus())) {
+                log.warn("[TenantDS] 租户ID={} 已禁用,回退主库", tenantId);
+                return;
+            }
+            switchTenant(tenantInfo);
+            log.info("[TenantDS] 动态注册并切换数据源: key={}", tenantKey);
+        } catch (Exception e) {
+            log.error("[TenantDS] 切换租户数据源失败 tenantId={}", tenantId, e);
+            DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
+        }
+    }
+
     private DataSource createTenantDataSource(TenantInfo tenant) {
         DruidDataSource ds = new DruidDataSource();
         ds.setUrl(tenant.getDbUrl());

+ 87 - 74
fs-service/src/main/java/com/fs/course/config/RedisKeyScanner.java

@@ -1,10 +1,15 @@
 package com.fs.course.config;
 
-import org.springframework.data.redis.connection.RedisConnection;
+import cn.hutool.core.util.ObjectUtil;
+import com.fs.common.config.RedisTenantContext;
+import com.fs.common.config.TenantKeyRedisSerializer;
+import com.fs.common.core.redis.RedisCache;
+import com.fs.common.utils.SecurityUtils;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.core.Cursor;
 import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.ScanOptions;
-import org.springframework.data.redis.core.StringRedisTemplate;
 import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.stereotype.Component;
 
@@ -14,107 +19,115 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * Redis Key 扫描工具,与 {@link RedisCache#keys(String)} 使用同一套 Key 序列化({@link TenantKeyRedisSerializer})。
+ * <p>
+ * 业务传入无前缀 pattern,如 {@code h5user:watch:duration:*}。
+ * SaaS 定时任务内需已设置 {@link RedisTenantContext} 或 SecurityContext 租户身份。
+ */
+@Slf4j
 @Component
 public class RedisKeyScanner {
 
-    private final StringRedisTemplate redisTemplate;
+    private static final int SCAN_COUNT = 1000;
 
-    public RedisKeyScanner(StringRedisTemplate redisTemplate) {
+    private final RedisTemplate<Object, Object> redisTemplate;
+    private final RedisCache redisCache;
+
+    public RedisKeyScanner(RedisTemplate<Object, Object> redisTemplate, RedisCache redisCache) {
         this.redisTemplate = redisTemplate;
+        this.redisCache = redisCache;
     }
 
-    public Set<String> scan(String pattern) {
-        Set<String> keys = new HashSet<>();
-
-        // 这里指定 match + count
-        ScanOptions options = ScanOptions.scanOptions()
-                .match(pattern)
-                .count(1000)
-                .build();
-
-        // 使用 RedisConnection 提供的 scan
-        redisTemplate.execute((RedisConnection connection) -> {
-            try (Cursor<byte[]> cursor = connection.scan(options)) {
-                RedisSerializer<String> keySerializer = redisTemplate.getStringSerializer();
-                cursor.forEachRemaining(item -> keys.add(keySerializer.deserialize(item)));
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return null;
-        });
-
-        return keys;
+    @SuppressWarnings("unchecked")
+    private RedisSerializer<String> keySerializer() {
+        return (RedisSerializer<String>) redisTemplate.getKeySerializer();
     }
 
-    public Set<String> scanKeys(String pattern) {
-        Set<String> keys = new HashSet<>();
-        ScanOptions options = ScanOptions.scanOptions().match(pattern).count(1000).build();
-
-        redisTemplate.execute((RedisConnection connection) -> {
-            try (Cursor<byte[]> cursor = connection.scan(options)) {
-                while (cursor.hasNext()) {
-                    byte[] rawKey = cursor.next();
-                    String key = (String) redisTemplate.getKeySerializer().deserialize(rawKey);
-                    keys.add(key);
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Error during scan", e);
+    private Long resolveTenantId() {
+        Long tenantId = RedisTenantContext.getTenantId();
+        if (ObjectUtil.isEmpty(tenantId)) {
+            try {
+                tenantId = SecurityUtils.getTenantId();
+            } catch (Exception ignored) {
+                tenantId = null;
             }
-            return null;
-        });
+        }
+        return tenantId;
+    }
 
-        return keys;
+    private String toRedisMatchPattern(String businessPattern) {
+        byte[] matchBytes = keySerializer().serialize(businessPattern);
+        if (matchBytes == null) {
+            return businessPattern;
+        }
+        return new String(matchBytes, StandardCharsets.UTF_8);
     }
 
     /**
-     * 使用 SCAN 替代 KEYS,避免阻塞 Redis
-     *
-     * @param pattern key 匹配模式,例如 "h5user:watch:duration:*"
-     * @return 匹配到的 key 集合
+     * 按业务 pattern 扫描,返回剥离租户前缀后的 key。
+     * 优先 KEYS(与 RedisCache 完全一致);若需非阻塞可走 SCAN。
      */
-    public Collection<String> scanPatternKeys(final String pattern) {
-        Set<String> keys = new HashSet<>();
+    private Set<String> scanInternal(String businessPattern) {
+        Long tenantId = resolveTenantId();
+        String redisMatchPattern = toRedisMatchPattern(businessPattern);
+
+        // 与同类任务 redisCache.keys("h5wxuser:watch:duration:*") 保持同一路径,确保租户前缀正确
+        Collection<String> keysFromKeys = redisCache.keys(businessPattern);
+        if (keysFromKeys != null && !keysFromKeys.isEmpty()) {
+            log.debug("Redis keys 命中 {} 条, tenantId={}, pattern={}, redisPattern={}",
+                    keysFromKeys.size(), tenantId, businessPattern, redisMatchPattern);
+            return new HashSet<>(keysFromKeys);
+        }
+
+        Set<String> keys = scanByScanCommand(businessPattern, redisMatchPattern);
+        if (keys.isEmpty()) {
+            log.warn("Redis 未扫描到 key, tenantId={}, pattern={}, redisPattern={}。"
+                            + "请确认:1) SaaS 任务已切换对应租户;2) Redis 中存在该租户前缀的 key",
+                    tenantId, businessPattern, redisMatchPattern);
+        } else {
+            log.debug("Redis SCAN 命中 {} 条, tenantId={}, pattern={}", keys.size(), tenantId, businessPattern);
+        }
+        return keys;
+    }
 
-        // 每次扫描的数量,可根据实际情况调整(越大速度越快,但对 CPU 影响越大)
+    private Set<String> scanByScanCommand(String businessPattern, String redisMatchPattern) {
+        RedisSerializer<String> keySerializer = keySerializer();
         ScanOptions options = ScanOptions.scanOptions()
-                .match(pattern)
-                .count(1000)
+                .match(redisMatchPattern)
+                .count(SCAN_COUNT)
                 .build();
 
-        // 使用 redisTemplate 的 execute 保证连接获取 & 释放正确
-        redisTemplate.execute((RedisConnection connection) -> {
+        Set<String> keys = new HashSet<>();
+        redisTemplate.execute((RedisCallback<Void>) connection -> {
             try (Cursor<byte[]> cursor = connection.scan(options)) {
-                RedisSerializer<String> keySerializer = redisTemplate.getStringSerializer();
                 while (cursor.hasNext()) {
-                    keys.add(keySerializer.deserialize(cursor.next()));
+                    String businessKey = keySerializer.deserialize(cursor.next());
+                    if (businessKey != null) {
+                        keys.add(businessKey);
+                    }
                 }
             } catch (IOException e) {
-                throw new RuntimeException("Error during redis SCAN", e);
+                throw new RuntimeException("Redis SCAN failed, pattern=" + businessPattern, e);
             }
             return null;
         });
-
         return keys;
     }
 
-    public Set<String> scanMatchKey(String pattern) {
-        return (Set<String>) redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
-            Set<String> keys = new HashSet<>();
-            // 使用 try-with-resources 确保 Cursor 无论是否异常都会被关闭
-            try (Cursor<byte[]> cursor = connection.scan(ScanOptions.scanOptions()
-                    .match(pattern)
-                    .count(1000) // 每次扫描的批量大小,可根据实际情况调整
-                    .build())) {
-                while (cursor.hasNext()) {
-                    keys.add(new String(cursor.next(), StandardCharsets.UTF_8)); // 显式指定字符集,避免依赖默认值
-                }
-            } catch (IOException e) {
-                // 在实际项目中,应该使用更合适的日志记录和异常处理方式
-                // 例如抛出自定义异常或记录错误日志
-                throw new RuntimeException("Error during SCAN operation", e);
-            }
-            return keys;
-        });
+    public Set<String> scan(String pattern) {
+        return scanInternal(pattern);
     }
 
+    public Set<String> scanKeys(String pattern) {
+        return scanInternal(pattern);
+    }
+
+    public Collection<String> scanPatternKeys(String pattern) {
+        return scanInternal(pattern);
+    }
+
+    public Set<String> scanMatchKey(String pattern) {
+        return scanInternal(pattern);
+    }
 }

+ 250 - 344
fs-service/src/main/java/com/fs/course/service/impl/FsCourseWatchLogServiceImpl.java

@@ -9,8 +9,11 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fs.common.config.RedisTenantContext;
 import com.fs.common.core.redis.RedisCache;
 import com.fs.common.utils.DateUtils;
+import com.fs.common.utils.SecurityUtils;
+import com.fs.common.utils.spring.SpringUtils;
 import com.fs.common.utils.DictUtils;
 import com.fs.common.utils.date.DateUtil;
 import com.fs.company.cache.ICompanyCacheService;
@@ -72,6 +75,7 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
+import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -88,6 +92,29 @@ import java.util.stream.Collectors;
 @Service
 public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMapper, FsCourseWatchLog> implements IFsCourseWatchLogService {
     private static final Logger log = LoggerFactory.getLogger(FsCourseWatchLogServiceImpl.class);
+    /** 与 {@link com.fs.common.config.TenantKeyRedisSerializer} 一致 */
+    private static final String TENANT_KEY_PREFIX = "tenantid:";
+    private static final String TENANT_DS_PREFIX = "tenant:";
+
+    /** 看课 Redis key 族:h5user=企微短链,h5wxuser=会员 WXH5 */
+    private enum WatchKeyFamily {
+        H5_USER("h5user"),
+        H5_WX_USER("h5wxuser");
+
+        private final String keyPrefix;
+
+        WatchKeyFamily(String keyPrefix) {
+            this.keyPrefix = keyPrefix;
+        }
+
+        boolean isWxUser() {
+            return this == H5_WX_USER;
+        }
+
+        String heartbeatKey(long id1, long id2, long id3) {
+            return keyPrefix + ":watch:heartbeat:" + id1 + ":" + id2 + ":" + id3;
+        }
+    }
     @Autowired
     private FsCourseWatchLogMapper fsCourseWatchLogMapper;
     @Autowired
@@ -376,117 +403,42 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
 
     @Override
     public void scheduleUpdateDurationToDatabase() {
-        log.info("WXH5-开始更新会员看课时长,检查完课>>>>>>");
-        //读取所有的key
-        Collection<String> keys = redisCache.keys("h5wxuser:watch:duration:*");
-
-        //读取看课配置
-        String json = configService.selectConfigByKey("course.config");
-        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
-
-        List<FsCourseWatchLog> logs = new ArrayList<>();
-        for (String key : keys) {
-            //取key中数据
-            String[] parts = key.split(":");
-            Long userId = Long.parseLong(parts[3]);
-            Long videoId = Long.parseLong(parts[4]);
-            Long companyUserId = Long.parseLong(parts[5]);
-            String durationStr = redisCache.getCacheObject(key);
-            if (durationStr == null) {
-                log.error("key中数据为null:{}", key);
-                continue;  // 如果 Redis 中没有记录,跳过
-            }
-            Long duration = Long.valueOf(durationStr);
-
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            watchLog.setVideoId(videoId);
-            watchLog.setUserId(userId);
-            watchLog.setCompanyUserId(companyUserId);
-            watchLog.setDuration(duration);
-
-            //取对应视频的时长
-            Long videoDuration = 0L;
-            try {
-                videoDuration = getFsUserVideoDuration(videoId);
-            } catch (Exception e) {
-                log.error("视频时长识别错误:{}", key);
-                continue;
-            }
-            if (videoDuration != null && videoDuration != 0) {
-                //判断是否完课
-                long percentage = (duration * 100 / videoDuration);
-                if (percentage >= config.getAnswerRate()) {
-                    watchLog.setLogType(2); // 设置状态为“已完成”checkFsUserWatchStatus
-                    watchLog.setFinishTime(new Date());
-                    String heartbeatKey = "h5wxuser:watch:heartbeat:" + userId + ":" + videoId + ":" + companyUserId;
-                    // 完课删除心跳记录
-                    redisCache.deleteObject(heartbeatKey);
-                    // 完课删除看课时长记录
-                    redisCache.deleteObject(key);
-                }
-            }
-            //集合中增加
-            logs.add(watchLog);
-        }
-        batchUpdateFsUserCourseWatchLog(logs, 100);
+        executeWatchLogRedisTask("WXH5-开始更新会员看课时长,检查完课",
+                "h5wxuser:watch:duration:*", 1, WatchKeyFamily.H5_WX_USER);
     }
 
     public Long getFsUserVideoDuration(Long videoId) {
-        //将视频时长也存到redis
+        return getFsUserVideoDuration(captureRedisTenantId(), videoId);
+    }
+
+    public Long getFsUserVideoDuration(Long tenantId, Long videoId) {
         String videoRedisKey = "h5wxuser:video:duration:" + videoId;
-        Long videoDuration = 0L;
+        Long videoDuration = null;
         try {
-            videoDuration = redisCache.getCacheObject(videoRedisKey);
+            Object cached = getCacheObjectWithTenant(tenantId, videoRedisKey);
+            if (cached != null) {
+                videoDuration = Long.valueOf(String.valueOf(cached));
+            }
         } catch (Exception e) {
-            String string = redisCache.getCacheObject(videoRedisKey);
-            videoDuration = Long.parseLong(string);
-            log.error("key中id为S:{}", videoDuration);
+            log.warn("读取视频时长缓存异常 videoId={}, tenantId={}", videoId, tenantId, e);
         }
 
-
         if (videoDuration == null) {
             FsUserCourseVideo video = courseVideoMapper.selectFsUserCourseVideoByVideoId(videoId);
+            if (video == null) {
+                log.warn("租户库未找到视频 videoId={}, tenantId={}", videoId, tenantId);
+                return null;
+            }
             videoDuration = video.getDuration();
-            redisCache.setCacheObject(videoRedisKey, video.getDuration());
+            setCacheObjectWithTenant(tenantId, videoRedisKey, video.getDuration());
         }
         return videoDuration;
     }
 
     @Override
     public void checkFsUserWatchStatus() {
-        log.info("WXH5-开始更新会员看课中断记录>>>>>");
-        // 从 Redis 中获取所有正在看课的用户记录
-        Collection<String> keys = redisCache.keys("h5wxuser:watch:heartbeat:*");
-        LocalDateTime now = LocalDateTime.now();
-        List<FsCourseWatchLog> logs = new ArrayList<>();
-        for (String key : keys) {
-            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-            String[] parts = key.split(":");
-            Long userId = Long.parseLong(parts[3]);
-            Long videoId = Long.parseLong(parts[4]);
-            Long companyUserId = Long.parseLong(parts[5]);
-            // 获取最后心跳时间
-            String lastHeartbeatStr = redisCache.getCacheObject(key);
-            if (lastHeartbeatStr == null) {
-                continue; // 如果 Redis 中没有记录,跳过
-            }
-            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
-            Duration duration = Duration.between(lastHeartbeatTime, now);
-
-            watchLog.setVideoId(videoId);
-            watchLog.setUserId(userId);
-            watchLog.setCompanyUserId(companyUserId);
-            // 如果超过一分钟没有心跳,标记为“观看中断”
-            if (duration.getSeconds() >= 60) {
-                watchLog.setLogType(4);
-                // 从 Redis 中删除该记录
-                redisCache.deleteObject(key);
-            } else {
-                watchLog.setLogType(1);
-            }
-            logs.add(watchLog);
-        }
-        batchUpdateFsUserCourseWatchLog(logs, 100);
+        executeWatchLogRedisTask("WXH5-开始更新会员看课中断记录",
+                "h5wxuser:watch:heartbeat:*", 2, WatchKeyFamily.H5_WX_USER);
     }
 
     @Override
@@ -818,323 +770,261 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
 
     @Override
     public void scheduleBatchUpdateToDatabase() {
-        try {
-            log.info("开始更新看课时长,检查完课>>>>>>");
+        executeWatchLogRedisTask("开始更新看课时长,检查完课",
+                "h5user:watch:duration:*", 1, WatchKeyFamily.H5_USER);
+    }
 
-            // 读取所有的key
-            Set<String> keys = redisKeyScanner.scanMatchKey("h5user:watch:duration:*");
-            log.info("共扫描到 {} 个待处理键", keys.size());
+    @Override
+    public void checkWatchStatus() {
+        executeWatchLogRedisTask("开始更新看课中断记录",
+                "h5user:watch:heartbeat:*", 2, WatchKeyFamily.H5_USER);
+    }
 
-            // 如果keys为空,直接返回
-            if (CollectionUtils.isEmpty(keys)) {
-                return;
+    /**
+     * 捕获当前线程 Redis 租户 ID(定时任务 / 请求线程在入池前调用)。
+     */
+    private Long captureRedisTenantId() {
+        Long tenantId = RedisTenantContext.getTenantId();
+        if (ObjectUtil.isEmpty(tenantId)) {
+            try {
+                tenantId = SecurityUtils.getTenantId();
+            } catch (Exception ignored) {
+                tenantId = null;
             }
+        }
+        return tenantId;
+    }
 
-            // 读取看课配置
-            String json = configService.selectConfigByKey("course.config");
-            CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
-
-            // 创建线程安全的集合
-            List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
-
-            // 创建线程池(根据服务器配置调整线程数)
-            int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
-            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
-
-            // 将keys分成多个批次,每个线程处理一批
-            List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
-
-            log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
-
-            int type = 1; //检查完课任务
-
-            // 创建所有任务
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            for (List<String> batchKeys : keyBatches) {
-                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-                    processKeyBatch(batchKeys, config, logs,type);
-                }, executorService);
-                futures.add(future);
-            }
+    /**
+     * 拼接带租户前缀的 Redis 完整 key(与写入时 TenantKeyRedisSerializer 结果一致)。
+     * 业务 key 如 {@code h5user:watch:duration:190:13059225:359} →
+     * {@code tenantid:35:h5user:watch:duration:190:13059225:359}。
+     */
+    private String buildTenantRedisKey(Long tenantId, String businessKey) {
+        if (businessKey == null) {
+            return null;
+        }
+        if (businessKey.startsWith(TENANT_KEY_PREFIX)) {
+            return businessKey;
+        }
+        if (tenantId == null) {
+            return businessKey;
+        }
+        return TENANT_KEY_PREFIX + tenantId + ":" + businessKey;
+    }
 
-            // 等待所有任务完成
-            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+    @SuppressWarnings("unchecked")
+    private <T> T getCacheObjectWithTenant(Long tenantId, String businessKey) {
+        return redisCache.getCacheObject(buildTenantRedisKey(tenantId, businessKey));
+    }
 
-            // 关闭线程池
-            executorService.shutdown();
+    private void deleteObjectWithTenant(Long tenantId, String businessKey) {
+        redisCache.deleteObject(buildTenantRedisKey(tenantId, businessKey));
+    }
 
-            // 批量更新到数据库
-            batchUpdateFsCourseWatchLog(logs, 100);
+    private void setCacheObjectWithTenant(Long tenantId, String businessKey, Object value) {
+        redisCache.setCacheObject(buildTenantRedisKey(tenantId, businessKey), value);
+    }
 
-        } catch (Exception e) {
-            log.error("定时任务执行失败 scheduleBatchUpdateToDatabase", e);
-        }
-//        log.info("开始更新看课时长,检查完课>>>>>>");
-//        //读取所有的key
-//        Collection<String> keys = redisCache.keys("h5user:watch:duration:*");
-//        //读取看课配置
-//        String json = configService.selectConfigByKey("course.config");
-//        CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
-//
-//        List<FsCourseWatchLog> logs = new ArrayList<>();
-//        List<FsCourseWatchLog> finishedLogs = new ArrayList<>();
-//        for (String key : keys) {
-//            //取key中数据
-//            Long qwUserId = null;
-//            Long videoId = null;
-//            Long externalId = null;
-//            try {
-//                String[] parts = key.split(":");
-//                qwUserId = Long.parseLong(parts[3]);
-//                externalId = Long.parseLong(parts[4]);
-//                videoId = Long.parseLong(parts[5]);
-//            } catch (Exception e) {
-//                log.error("key中id为null:{}", key);
-//                continue;
-//            }
-//            String durationStr = redisCache.getCacheObject(key);
-//            if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
-//                redisCache.deleteObject(key);
-//                log.error("key中数据为null:{}", key);
-//                continue;  // 如果 Redis 中没有记录,跳过
-//            }
-//            Long duration = Long.valueOf(durationStr);
-//
-//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-//            watchLog.setVideoId(videoId);
-//            watchLog.setQwUserId(qwUserId);
-//            watchLog.setQwExternalContactId(externalId);
-//            watchLog.setDuration(duration);
-//
-//            //取对应视频的时长
-//            Long videoDuration = 0L;
-//            try {
-//                videoDuration = getVideoDuration(videoId);
-//            } catch (Exception e) {
-//                log.error("视频时长识别错误:{}", key);
-//                continue;
-//            }
-//
-//            if (videoDuration != null && videoDuration != 0) {
-//                boolean complete = false;
-//                // 判断百分比
-//                if (config.getCompletionMode() == 1 && config.getAnswerRate() != null) {
-//                    long percentage = (duration * 100 / videoDuration);
-//                    complete = percentage >= config.getAnswerRate();
-//                }
-//                // 判断分钟数
-//                if (config.getCompletionMode() == 2 && config.getMinutesNum() != null) {
-//                    int i = config.getMinutesNum() * 60;
-//                    complete = videoDuration > i;
-//                }
-//                //判断是否完课
-//                if (complete) {
-//                    watchLog.setLogType(2); // 设置状态为“已完成”
-//                    watchLog.setFinishTime(new Date());
-//                    String heartbeatKey = "h5user:watch:heartbeat:" + qwUserId + ":" + externalId + ":" + videoId;
-//                    // 完课删除心跳记录
-//                    redisCache.deleteObject(heartbeatKey);
-//                    // 完课删除看课时长记录
-//                    redisCache.deleteObject(key);
-//
-//                    finishedLogs.add(watchLog);
-//                }
-//            }
-//            //集合中增加
-//            logs.add(watchLog);
-//        }
-//
-//        batchUpdateFsCourseWatchLog(logs, 100);
-//
-//        // 完课打标签
-//        if (CollectionUtils.isNotEmpty(finishedLogs)) {
-//            fsTagUpdateService.onCourseWatchFinishedBatch(finishedLogs);
-//        }
+    private String getCacheObjectAsString(Long tenantId, String businessKey) {
+        Object val = getCacheObjectWithTenant(tenantId, businessKey);
+        return val == null ? null : String.valueOf(val);
     }
 
-    @Override
-    public void checkWatchStatus() {
+    /**
+     * CourseWatchLogScheduler 下所有看课 Redis 定时任务的统一入口(租户扫描 + 异步批处理 + 落库)。
+     *
+     * @param taskLogPrefix 日志前缀
+     * @param keyPattern    业务 key 模式,如 h5user:watch:duration:*
+     * @param processType   1=检查完课,2=检查看课中断
+     * @param family        h5user / h5wxuser
+     */
+    private void executeWatchLogRedisTask(String taskLogPrefix, String keyPattern, int processType,
+                                          WatchKeyFamily family) {
         try {
-            log.info("开始更新看课中断记录>>>>>");
-
-            // 读取所有的key
-            Set<String> keys = redisKeyScanner.scanMatchKey("h5user:watch:heartbeat:*");
-            log.info("共扫描到 {} 个待处理键", keys.size());
-
-            // 如果keys为空,直接返回
+            log.info("{}>>>>>>", taskLogPrefix);
+            Set<String> keys = redisKeyScanner.scanMatchKey(keyPattern);
+            log.info("共扫描到 {} 个待处理键, pattern={}", keys.size(), keyPattern);
             if (CollectionUtils.isEmpty(keys)) {
                 return;
             }
 
-            // 读取看课配置
             String json = configService.selectConfigByKey("course.config");
+            if (com.fs.common.utils.StringUtils.isEmpty(json)) {
+                log.error("{} course.config 未配置或为空", taskLogPrefix);
+                return;
+            }
             CourseConfig config = JSONUtil.toBean(json, CourseConfig.class);
+            if (config == null) {
+                log.error("{} course.config 解析失败", taskLogPrefix);
+                return;
+            }
 
-            // 创建线程安全的集合
             List<FsCourseWatchLog> logs = Collections.synchronizedList(new ArrayList<>());
-
-            // 创建线程池(根据服务器配置调整线程数)
             int threadCount = Math.min(Runtime.getRuntime().availableProcessors() * 2, 8);
             ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
-
-            // 将keys分成多个批次,每个线程处理一批
             List<List<String>> keyBatches = partitionKeys(new ArrayList<>(keys), threadCount * 10);
-
             log.info("开始多线程处理,共 {} 个批次", keyBatches.size());
 
-            int type = 2; //检查看课中断
-
-            // 创建所有任务
+            final Long capturedTenantId = captureRedisTenantId();
             List<CompletableFuture<Void>> futures = new ArrayList<>();
             for (List<String> batchKeys : keyBatches) {
-                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-                    processKeyBatch(batchKeys, config, logs, type);
-                }, executorService);
-                futures.add(future);
+                futures.add(CompletableFuture.runAsync(() ->
+                        processKeyBatch(capturedTenantId, batchKeys, config, logs, processType, family),
+                        executorService));
             }
-
-            // 等待所有任务完成
             CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
-
-            // 关闭线程池
             executorService.shutdown();
 
-            // 批量更新到数据库
-            batchUpdateFsCourseWatchLog(logs, 100);
+            if (family.isWxUser()) {
+                batchUpdateFsUserCourseWatchLog(logs, 100);
+            } else {
+                batchUpdateFsCourseWatchLog(logs, 100);
+            }
+        } catch (Exception e) {
+            log.error("{} 执行失败", taskLogPrefix, e);
+        }
+    }
 
+    /**
+     * 子线程切换租户库(ThreadLocal 不会随 CompletableFuture 传递)。
+     */
+    private boolean ensureTenantDataSource(Long tenantId) {
+        if (tenantId == null) {
+            return false;
+        }
+        try {
+            Object manager = SpringUtils.getBean("tenantDataSourceManager");
+            Method method = manager.getClass().getMethod("ensureSwitchByTenantId", Long.class);
+            method.invoke(manager, tenantId);
+            return true;
+        } catch (NoSuchMethodException e) {
+            try {
+                Class<?> holderClass = Class.forName("com.fs.framework.datasource.DynamicDataSourceContextHolder");
+                holderClass.getMethod("setDataSourceType", String.class)
+                        .invoke(null, TENANT_DS_PREFIX + tenantId);
+                return true;
+            } catch (Exception ex) {
+                log.warn("切换租户数据源失败 tenantId={}", tenantId, ex);
+                return false;
+            }
         } catch (Exception e) {
-            log.error("定时任务执行失败checkWatchStatus", e);
+            log.warn("切换租户数据源失败 tenantId={}", tenantId, e);
+            return false;
+        }
+    }
+
+    private void clearTenantDataSource() {
+        try {
+            Object manager = SpringUtils.getBean("tenantDataSourceManager");
+            manager.getClass().getMethod("clear").invoke(manager);
+        } catch (Exception e) {
+            try {
+                Class<?> holderClass = Class.forName("com.fs.framework.datasource.DynamicDataSourceContextHolder");
+                holderClass.getMethod("clearDataSourceType").invoke(null);
+            } catch (Exception ignored) {
+            }
         }
-//        log.info("开始更新看课中断记录>>>>>");
-//        // 从 Redis 中获取所有正在看课的用户记录
-//        Collection<String> keys = redisCache.keys("h5user:watch:heartbeat:*");
-//        LocalDateTime now = LocalDateTime.now();
-//        List<FsCourseWatchLog> logs = new ArrayList<>();
-//
-//        List<FsCourseWatchLog> watchingLogs = new ArrayList<>();
-//        for (String key : keys) {
-//            FsCourseWatchLog watchLog = new FsCourseWatchLog();
-//            //取key中数据
-//            Long qwUserId = null;
-//            Long videoId = null;
-//            Long externalId = null;
-//            try {
-//                String[] parts = key.split(":");
-//                qwUserId = Long.parseLong(parts[3]);
-//                externalId = Long.parseLong(parts[4]);
-//                videoId = Long.parseLong(parts[5]);
-//            } catch (Exception e) {
-//                log.error("key中id为null:{}", key);
-//                continue;
-//            }
-//            // 获取最后心跳时间
-//            String lastHeartbeatStr = redisCache.getCacheObject(key);
-//            if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
-//                redisCache.deleteObject(key);
-//                continue; // 如果 Redis 中没有记录,跳过
-//            }
-//            LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
-//            Duration duration = Duration.between(lastHeartbeatTime, now);
-//
-//            watchLog.setVideoId(videoId);
-//            watchLog.setQwUserId(qwUserId);
-//            watchLog.setQwExternalContactId(externalId);
-//            // 如果超过一分钟没有心跳,标记为“观看中断”
-//            if (duration.getSeconds() >= 60) {
-//                watchLog.setLogType(4);
-//                // 从 Redis 中删除该记录
-//                redisCache.deleteObject(key);
-//            } else {
-//                watchLog.setLogType(1);
-//                watchingLogs.add(watchLog);
-//            }
-//            logs.add(watchLog);
-//        }
-//        batchUpdateFsCourseWatchLog(logs, 100);
-//
-//        if (CollectionUtils.isNotEmpty(watchingLogs)) {
-//            fsTagUpdateService.onCourseWatchingBatch(watchingLogs);
-//        }
     }
 
     /**
      * 处理一个key批次
+     *
+     * @param tenantId 当前租户 ID,子线程中必须显式传入(ThreadLocal 不会自动传递)
      */
-    private void processKeyBatch(List<String> batchKeys, CourseConfig config, List<FsCourseWatchLog> logs,int type) {
+    private void processKeyBatch(Long tenantId, List<String> batchKeys, CourseConfig config,
+                                 List<FsCourseWatchLog> logs, int type, WatchKeyFamily family) {
+        if (tenantId == null) {
+            log.warn("processKeyBatch 缺少 tenantId,Redis/DB 可能无法访问租户数据");
+        }
+        boolean dsSwitched = ensureTenantDataSource(tenantId);
+        try {
+            processKeyBatchInternal(tenantId, batchKeys, config, logs, type, family);
+        } finally {
+            if (dsSwitched) {
+                clearTenantDataSource();
+            }
+        }
+    }
+
+    private void processKeyBatchInternal(Long tenantId, List<String> batchKeys, CourseConfig config,
+                                           List<FsCourseWatchLog> logs, int type, WatchKeyFamily family) {
         LocalDateTime now = LocalDateTime.now();
         for (String key : batchKeys) {
             try {
-                // 取key中数据
-                Long qwUserId = null;
-                Long videoId = null;
-                Long externalId = null;
+                // 取key中数据:parts[3]:parts[4]:parts[5]
+                long id1;
+                long id2;
+                long id3;
                 try {
                     String[] parts = key.split(":");
-                    qwUserId = Long.parseLong(parts[3]);
-                    externalId = Long.parseLong(parts[4]);
-                    videoId = Long.parseLong(parts[5]);
+                    id1 = Long.parseLong(parts[3]);
+                    id2 = Long.parseLong(parts[4]);
+                    id3 = Long.parseLong(parts[5]);
                 } catch (Exception e) {
                     log.error("key中id为null:{}", key);
                     continue;
                 }
                 FsCourseWatchLog watchLog = new FsCourseWatchLog();
                 //检查完课
-                if (type==1){
-                    String durationStr = redisCache.getCacheObject(key);
+                if (type == 1) {
+                    String durationStr = getCacheObjectAsString(tenantId, key);
                     if (com.fs.common.utils.StringUtils.isEmpty(durationStr)) {
-                        redisCache.deleteObject(key);
-                        log.error("key中数据为null:{}", key);
-                        continue;  // 如果 Redis 中没有记录,跳过
+                        deleteObjectWithTenant(tenantId, key);
+                        log.error("key中数据为null, tenantId={}, businessKey={}, redisKey={}",
+                                tenantId, key, buildTenantRedisKey(tenantId, key));
+                        continue;
                     }
                     Long duration = Long.valueOf(durationStr);
-
                     watchLog.setDuration(duration);
 
-                    // 取对应视频的时长
                     Long videoDuration;
                     try {
-                        videoDuration = getVideoDuration(videoId);
+                        if (family.isWxUser()) {
+                            videoDuration = getFsUserVideoDuration(tenantId, id2);
+                        } else {
+                            videoDuration = getVideoDuration(tenantId, id3);
+                        }
                     } catch (Exception e) {
-                        log.error("视频时长识别错误:{}", key);
+                        log.error("视频时长识别错误:{}", key, e);
                         continue;
                     }
 
-
-                    if (videoDuration != null && videoDuration != 0) {
-                        // 判断是否完课
+                    if (videoDuration != null && videoDuration != 0
+                            && config.getAnswerRate() != null) {
                         long percentage = (duration * 100 / videoDuration);
                         if (percentage >= config.getAnswerRate()) {
-                            watchLog.setLogType(2); // 设置状态为"已完成"
+                            watchLog.setLogType(2);
                             watchLog.setFinishTime(new Date());
-                            String heartbeatKey = "h5user:watch:heartbeat:" + qwUserId + ":" + externalId + ":" + videoId;
-                            // 完课删除心跳记录
-                            redisCache.deleteObject(heartbeatKey);
-                            // 完课删除看课时长记录
-                            redisCache.deleteObject(key);
+                            String heartbeatKey = family.heartbeatKey(id1, id2, id3);
+                            deleteObjectWithTenant(tenantId, heartbeatKey);
+                            deleteObjectWithTenant(tenantId, key);
                         }
                     }
-                }else{
-                    //检查看课中断
-                    // 获取最后心跳时间
-                    String lastHeartbeatStr = redisCache.getCacheObject(key);
+                } else {
+                    String lastHeartbeatStr = getCacheObjectAsString(tenantId, key);
                     if (com.fs.common.utils.StringUtils.isEmpty(lastHeartbeatStr)) {
-                        redisCache.deleteObject(key);
-                        continue; // 如果 Redis 中没有记录,跳过
+                        deleteObjectWithTenant(tenantId, key);
+                        continue;
                     }
                     LocalDateTime lastHeartbeatTime = LocalDateTime.parse(lastHeartbeatStr);
                     Duration duration = Duration.between(lastHeartbeatTime, now);
-                    // 如果超过一分钟没有心跳,标记为“观看中断”
                     if (duration.getSeconds() >= 60) {
                         watchLog.setLogType(4);
-                        // 从 Redis 中删除该记录
-                        redisCache.deleteObject(key);
+                        deleteObjectWithTenant(tenantId, key);
                     } else {
                         watchLog.setLogType(1);
                     }
                 }
-                watchLog.setVideoId(videoId);
-                watchLog.setQwUserId(qwUserId);
-                watchLog.setQwExternalContactId(externalId);
+                if (family.isWxUser()) {
+                    watchLog.setUserId(id1);
+                    watchLog.setVideoId(id2);
+                    watchLog.setCompanyUserId(id3);
+                } else {
+                    watchLog.setQwUserId(id1);
+                    watchLog.setQwExternalContactId(id2);
+                    watchLog.setVideoId(id3);
+                }
 
 
                 // 线程安全地添加到集合
@@ -1342,22 +1232,38 @@ public class FsCourseWatchLogServiceImpl extends ServiceImpl<FsCourseWatchLogMap
 
 
     public Long getVideoDuration(Long videoId) {
-        //将视频时长也存到redis
+        Long tenantId = captureRedisTenantId();
+        boolean dsSwitched = ensureTenantDataSource(tenantId);
+        try {
+            return getVideoDuration(tenantId, videoId);
+        } finally {
+            if (dsSwitched) {
+                clearTenantDataSource();
+            }
+        }
+    }
+
+    public Long getVideoDuration(Long tenantId, Long videoId) {
         String videoRedisKey = "h5user:video:duration:" + videoId;
-        Long videoDuration = 0L;
+        Long videoDuration = null;
         try {
-            videoDuration = redisCache.getCacheObject(videoRedisKey);
+            Object cached = getCacheObjectWithTenant(tenantId, videoRedisKey);
+            if (cached != null) {
+                videoDuration = Long.valueOf(String.valueOf(cached));
+            }
         } catch (Exception e) {
-            String string = redisCache.getCacheObject(videoRedisKey);
-            videoDuration = Long.parseLong(string);
-            log.error("key中id为S:{}", videoDuration);
+            log.warn("读取视频时长缓存异常 videoId={}, tenantId={}", videoId, tenantId, e);
         }
 
-
         if (videoDuration == null) {
+            // 调用方若在子线程,须已 ensureTenantDataSource(见 processKeyBatch)
             FsUserCourseVideo video = courseVideoMapper.selectFsUserCourseVideoByVideoId(videoId);
+            if (video == null) {
+                log.warn("租户库未找到视频 videoId={}, tenantId={}", videoId, tenantId);
+                return null;
+            }
             videoDuration = video.getDuration();
-            redisCache.setCacheObject(videoRedisKey, video.getDuration());
+            setCacheObjectWithTenant(tenantId, videoRedisKey, video.getDuration());
         }
         return videoDuration;
     }