yuhongqi преди 2 дни
родител
ревизия
147cb12603

+ 26 - 10
fs-live-app/src/main/java/com/fs/live/task/Task.java

@@ -169,7 +169,9 @@ public class Task {
                         redisCache.expire(key+live.getLiveId(), 1, TimeUnit.DAYS);
                     });
                 }
-                
+                String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
+                redisCache.setCacheObject(cacheKey,live,1,TimeUnit.HOURS);
+                liveWatchUserService.clearLiveFlagCache(live.getLiveId());
                 // 将开启的直播间信息写入Redis缓存,用于打标签定时任务
                 try {
                     // 获取视频时长
@@ -216,8 +218,10 @@ public class Task {
                         redisCache.redisTemplate.opsForZSet().remove(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
                     });
                 }
+                String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
+                redisCache.deleteObject(cacheKey);
                 webSocketServer.removeLikeCountCache(live.getLiveId());
-                
+
                 // 删除打标签缓存
                 try {
                     String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
@@ -679,7 +683,8 @@ public class Task {
                         queryUser.setLiveId(liveId);
                         queryUser.setLiveFlag(1);
                         queryUser.setReplayFlag(0);
-                        List<LiveWatchUser> liveUsers = liveWatchUserService.selectLiveWatchUserList(queryUser);
+                        queryUser.setOnline(0);
+                        List<LiveWatchUser> liveUsers = liveWatchUserService.selectAllWatchUser(queryUser);
 
                         if (liveUsers != null && !liveUsers.isEmpty()) {
 
@@ -725,6 +730,7 @@ public class Task {
                                 // 更新直播用户的在线时长
                                 liveUser.setOnlineSeconds(totalOnlineSeconds);
                                 liveUser.setUpdateTime(nowDate);
+                                liveUser.setOnline(1);
                                 updateLiveUsers.add(liveUser);
 
                                 // 2. 生成回放用户数据(liveFlag = 0, replayFlag = 1),在线时长从0开始
@@ -732,7 +738,7 @@ public class Task {
                                 replayUser.setLiveId(liveUser.getLiveId());
                                 replayUser.setUserId(liveUser.getUserId());
                                 replayUser.setMsgStatus(liveUser.getMsgStatus());
-                                replayUser.setOnline(liveUser.getOnline());
+                                replayUser.setOnline(0);
                                 replayUser.setOnlineSeconds(0L); // 回放观看时长从0开始,重新计时
                                 replayUser.setGlobalVisible(liveUser.getGlobalVisible());
                                 replayUser.setSingleVisible(liveUser.getSingleVisible());
@@ -742,6 +748,7 @@ public class Task {
                                 replayUser.setCreateTime(nowDate);
                                 replayUser.setUpdateTime(nowDate);
                                 replayUsers.add(replayUser);
+                                redisCache.setCacheObject(entryTimeKey,now);
                             }
 
                             // 批量更新直播用户的在线时长
@@ -824,8 +831,7 @@ public class Task {
                     queryUser.setLiveId(liveId);
                     queryUser.setLiveFlag(1);
                     queryUser.setReplayFlag(0);
-                    queryUser.setOnline(0); // 在线用户
-                    List<LiveWatchUser> onlineUsers = liveWatchUserService.selectLiveWatchUserList(queryUser);
+                    List<LiveWatchUser> onlineUsers = liveWatchUserService.selectAllWatchUser(queryUser);
                     if (onlineUsers == null || onlineUsers.isEmpty()) {
                         continue;
                     }
@@ -840,6 +846,7 @@ public class Task {
                     }
 
                     // 处理每个在线用户
+                    List<LiveWatchLog> updateLog = new ArrayList<>();
                     for (LiveWatchUser user : onlineUsers) {
                         try {
                             Long userId = user.getUserId();
@@ -869,13 +876,22 @@ public class Task {
 
                             // 使用 updateLiveWatchLogTypeByDuration 的逻辑更新观看记录状态
                             updateLiveWatchLogTypeByDuration(liveId, userId, qwUserId, externalContactId,
-                                    onlineSeconds, totalVideoDuration);
+                                    onlineSeconds, totalVideoDuration, updateLog);
                             
                         } catch (Exception e) {
                             log.error("处理用户观看记录状态异常: liveId={}, userId={}, error={}",
                                     liveId, user.getUserId(), e.getMessage(), e);
                         }
                     }
+                    // 批量插入回放用户数据
+                    if (!updateLog.isEmpty()) {
+                        int batchSize = 500;
+                        for (int i = 0; i < updateLog.size(); i += batchSize) {
+                            int end = Math.min(i + batchSize, updateLog.size());
+                            List<LiveWatchLog> batch = updateLog.subList(i, end);
+                            liveWatchLogService.batchUpdateLiveWatchLog(batch);
+                        }
+                    }
                     
                 } catch (Exception e) {
                     log.error("处理直播间观看记录状态异常: liveId={}, error={}",
@@ -897,7 +913,7 @@ public class Task {
      * @param totalVideoDuration 视频总时长(秒)
      */
     private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long qwUserId,
-                                                   Long exId, Long onlineSeconds, long totalVideoDuration) {
+                                                   Long exId, Long onlineSeconds, long totalVideoDuration, List<LiveWatchLog> updateLog) {
         try {
             // 查询 LiveWatchLog
             LiveWatchLog queryLog = new LiveWatchLog();
@@ -906,7 +922,7 @@ public class Task {
             queryLog.setQwUserId(String.valueOf(qwUserId));
             queryLog.setExternalContactId(exId);
 
-            List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
+            List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogByLogIdWithCache(queryLog);
             if (logs == null || logs.isEmpty()) {
                 return;
             }
@@ -940,7 +956,7 @@ public class Task {
                 // 如果 logType 已经是 2(完课),不再更新
                 if (needUpdate) {
                     log.setLogType(newLogType);
-                    liveWatchLogService.updateLiveWatchLog(log);
+                    updateLog.add(log);
                 }
             }
         } catch (Exception e) {

+ 2 - 2
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -128,7 +128,7 @@ public class WebSocketServer {
 
         // 记录连接信息 管理员不记录
         if (userType == 0) {
-            FsUserScrm fsUser = fsUserService.selectFsUserByUserId(userId);
+            FsUserScrm fsUser = fsUserService.selectFsUserById(userId);
             if (Objects.isNull(fsUser)) {
                 throw new BaseException("用户信息错误");
             }
@@ -275,7 +275,7 @@ public class WebSocketServer {
         ConcurrentHashMap<Long, Session> room = getRoom(liveId);
         List<Session> adminRoom = getAdminRoom(liveId);
         if (userType == 0) {
-            FsUserScrm fsUser = fsUserService.selectFsUserByUserId(userId);
+            FsUserScrm fsUser = fsUserService.selectFsUserById(userId);
             if (Objects.isNull(fsUser)) {
                 throw new BaseException("用户信息错误");
             }

+ 7 - 0
fs-service/src/main/java/com/fs/live/mapper/LiveWatchLogMapper.java

@@ -74,4 +74,11 @@ public interface LiveWatchLogMapper extends BaseMapper<LiveWatchLog> {
 
     List<LiveWatchLogListVO> selectLiveWatchLogListInfo(LiveWatchLog liveWatchLog);
 
+    /**
+     * 批量更新直播看课记录
+     * @param liveWatchLogs 需要更新的直播看课记录列表
+     * @return 更新的记录数
+     */
+    int batchUpdateLiveWatchLog(@Param("list") List<LiveWatchLog> liveWatchLogs);
+
 }

+ 14 - 0
fs-service/src/main/java/com/fs/live/service/ILiveWatchLogService.java

@@ -66,4 +66,18 @@ public interface ILiveWatchLogService extends IService<LiveWatchLog>{
      * @return
      */
     List<LiveWatchLogListVO> selectLiveWatchLogListInfo(LiveWatchLog liveWatchLog);
+
+    /**
+     * 批量更新直播看课记录
+     * @param liveWatchLogs 需要更新的直播看课记录列表
+     * @return 更新的记录数
+     */
+    int batchUpdateLiveWatchLog(List<LiveWatchLog> liveWatchLogs);
+
+    /**
+     * 查询直播看课记录并缓存1小时
+     * @param logId 直播看课记录主键
+     * @return 直播看课记录
+     */
+    List<LiveWatchLog> selectLiveWatchLogByLogIdWithCache(LiveWatchLog logId);
 }

+ 2 - 0
fs-service/src/main/java/com/fs/live/service/ILiveWatchUserService.java

@@ -177,4 +177,6 @@ public interface ILiveWatchUserService {
      * @param liveId 直播间ID
      */
     void clearLiveFlagCache(Long liveId);
+
+    List<LiveWatchUser> selectAllWatchUser(LiveWatchUser queryUser);
 }

+ 66 - 0
fs-service/src/main/java/com/fs/live/service/impl/LiveWatchLogServiceImpl.java

@@ -1,7 +1,11 @@
 package com.fs.live.service.impl;
 
+import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import com.fs.common.core.redis.RedisCache;
 import com.fs.common.utils.DateUtils;
+import com.fs.common.utils.spring.SpringUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fs.live.vo.LiveWatchLogListVO;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -21,6 +25,10 @@ public class LiveWatchLogServiceImpl extends ServiceImpl<LiveWatchLogMapper, Liv
 
     @Autowired
     private LiveWatchLogMapper liveWatchLogMapper;
+    
+    private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
+    
+    private static final String LIVE_WATCH_LOG_CACHE_KEY = "live:watch:log:cache:%s"; // logId
 
     /**
      * 查询直播看课记录
@@ -101,4 +109,62 @@ public class LiveWatchLogServiceImpl extends ServiceImpl<LiveWatchLogMapper, Liv
     {
         return baseMapper.deleteLiveWatchLogByLogId(logId);
     }
+
+    /**
+     * 批量更新直播看课记录
+     * @param liveWatchLogs 需要更新的直播看课记录列表
+     * @return 更新的记录数
+     */
+    @Override
+    public int batchUpdateLiveWatchLog(List<LiveWatchLog> liveWatchLogs) {
+        if (liveWatchLogs == null || liveWatchLogs.isEmpty()) {
+            return 0;
+        }
+        Date now = DateUtils.getNowDate();
+        // 设置统一的更新时间
+        for (LiveWatchLog log : liveWatchLogs) {
+            if (log.getUpdateTime() == null) {
+                log.setUpdateTime(now);
+            }
+        }
+        int result = baseMapper.batchUpdateLiveWatchLog(liveWatchLogs);
+        
+        // 更新后清除相关缓存
+        for (LiveWatchLog log : liveWatchLogs) {
+            if (log.getLogId() != null) {
+                String cacheKey = String.format(LIVE_WATCH_LOG_CACHE_KEY, log.getLogId());
+                redisCache.deleteObject(cacheKey);
+            }
+        }
+        
+        return result;
+    }
+
+    /**
+     * 查询直播看课记录并缓存1小时
+     * @param logId 直播看课记录主键
+     * @return 直播看课记录
+     */
+    @Override
+    public List<LiveWatchLog> selectLiveWatchLogByLogIdWithCache(LiveWatchLog logId) {
+        if (logId == null) {
+            return null;
+        }
+        
+        // 先从缓存中获取
+        String cacheKey = String.format(LIVE_WATCH_LOG_CACHE_KEY, logId);
+        List<LiveWatchLog> cachedLog = redisCache.getCacheObject(cacheKey);
+        if (cachedLog != null) {
+            return cachedLog;
+        }
+        
+        // 缓存中没有,从数据库查询
+        List<LiveWatchLog> log = baseMapper.selectLiveWatchLogList(logId);
+        if (log != null) {
+            // 将查询结果缓存1小时
+            redisCache.setCacheObject(cacheKey, log, 1, TimeUnit.HOURS);
+        }
+        
+        return log;
+    }
 }

+ 5 - 0
fs-service/src/main/java/com/fs/live/service/impl/LiveWatchUserServiceImpl.java

@@ -220,6 +220,11 @@ public class LiveWatchUserServiceImpl implements ILiveWatchUserService {
         redisCache.deleteObject(cacheKey);
     }
 
+    @Override
+    public List<LiveWatchUser> selectAllWatchUser(LiveWatchUser queryUser) {
+        return baseMapper.selectLiveWatchUserList(queryUser);
+    }
+
     /**
      * 批量删除直播间观看用户
      *

+ 91 - 0
fs-service/src/main/resources/mapper/live/LiveWatchLogMapper.xml

@@ -245,4 +245,95 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="replayBuy != null">and t1.replay_buy = #{replayBuy} </if>
         </where>
     </select>
+
+    <!-- 批量更新直播看课记录 -->
+    <update id="batchUpdateLiveWatchLog" parameterType="java.util.List">
+        UPDATE live_watch_log
+        <set>
+            <if test="list != null and list.size() > 0 and list[0].logType != null">
+                log_type = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.logType}
+                </foreach>
+                ELSE log_type
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].updateTime != null">
+                update_time = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.updateTime}
+                </foreach>
+                ELSE update_time
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].finishTime != null">
+                finish_time = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.finishTime}
+                </foreach>
+                ELSE finish_time
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].sopCreateTime != null">
+                sop_create_time = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.sopCreateTime}
+                </foreach>
+                ELSE sop_create_time
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].sendAppId != null">
+                send_app_id = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.sendAppId}
+                </foreach>
+                ELSE send_app_id
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].logSource != null">
+                log_source = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.logSource}
+                </foreach>
+                ELSE log_source
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].watchType != null">
+                watch_type = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.watchType}
+                </foreach>
+                ELSE watch_type
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].corpId != null">
+                corp_id = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.corpId}
+                </foreach>
+                ELSE corp_id
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].liveBuy != null">
+                live_buy = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.liveBuy}
+                </foreach>
+                ELSE live_buy
+                END,
+            </if>
+            <if test="list != null and list.size() > 0 and list[0].replayBuy != null">
+                replay_buy = CASE log_id
+                <foreach collection="list" item="item">
+                    WHEN #{item.logId} THEN #{item.replayBuy}
+                </foreach>
+                ELSE replay_buy
+                END
+            </if>
+        </set>
+        WHERE log_id IN
+        <foreach collection="list" item="item" open="(" separator="," close=")">
+            #{item.logId}
+        </foreach>
+    </update>
 </mapper>