Browse Source

利用redis计算用户在线时长,更新用户看课记录

yuhongqi 1 week ago
parent
commit
a4290aad1d

+ 1 - 1
fs-admin/src/main/java/com/fs/hisStore/task/LiveTask.java

@@ -172,7 +172,7 @@ public class LiveTask {
 
     // 订单银行回调数据丢失补偿
     public void recoveryBankOrder() {
-        // 查询出来最近三十分钟的订单 待支付 未退款
+        // 查询出来最近15分钟的订单 待支付 未退款
         List<LiveOrder> list = liveOrderService.selectBankOrder();
         if(list == null || list.isEmpty()) return;
         for (LiveOrder liveOrder : list) {

+ 1 - 1
fs-common/src/main/java/com/fs/common/constant/LiveKeysConstant.java

@@ -28,7 +28,7 @@ public class LiveKeysConstant {
     public static final String TOP_MSG = "topMsg"; //抽奖记录
 
     public static final String LIVE_FLAG_CACHE = "live:flag:%s"; //直播间直播/回放状态缓存
-    public static final Integer LIVE_FLAG_CACHE_EXPIRE = 300; //直播间状态缓存过期时间(秒)
+    public static final Integer LIVE_FLAG_CACHE_EXPIRE = 60; //直播间状态缓存过期时间(秒)
 
     public static final String LIVE_DATA_CACHE = "live:data:%s"; //直播间数据缓存
     public static final Integer LIVE_DATA_CACHE_EXPIRE = 300; //直播间数据缓存过期时间(秒)

+ 284 - 23
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -11,6 +11,9 @@ import com.fs.hisStore.domain.FsUserScrm;
 import com.fs.hisStore.service.IFsUserScrmService;
 import com.fs.live.config.ProductionWordFilter;
 import com.fs.live.mapper.LiveCouponMapper;
+import com.fs.live.vo.LiveWatchUserEntry;
+import com.fs.live.domain.LiveWatchLog;
+import com.fs.live.domain.LiveVideo;
 import com.fs.live.websocket.auth.WebSocketConfigurator;
 import com.fs.live.websocket.bean.SendMsgVo;
 import com.fs.common.core.domain.R;
@@ -55,7 +58,7 @@ public class WebSocketServer {
     // 心跳超时缓存:key=sessionId,value=最后心跳时间戳
     private final static ConcurrentHashMap<String, Long> heartbeatCache = new ConcurrentHashMap<>();
     // 心跳超时时间(毫秒):3分钟无心跳则认为超时
-    private final static long HEARTBEAT_TIMEOUT = 3 * 60 * 1000;
+    private final static long HEARTBEAT_TIMEOUT = 1 * 60 * 1000;
     // admin房间消息发送线程池(单线程,保证串行化)
     private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
 
@@ -72,7 +75,12 @@ public class WebSocketServer {
     private final ILiveUserFirstEntryService liveUserFirstEntryService =  SpringUtils.getBean(ILiveUserFirstEntryService.class);
     private final ILiveCouponIssueService liveCouponIssueService =  SpringUtils.getBean(ILiveCouponIssueService.class);
     private final LiveCouponMapper liveCouponMapper = SpringUtils.getBean(LiveCouponMapper.class);
+    private final ILiveWatchLogService liveWatchLogService = SpringUtils.getBean(ILiveWatchLogService.class);
+    private final ILiveVideoService liveVideoService = SpringUtils.getBean(ILiveVideoService.class);
     private static Random random = new Random();
+    
+    // Redis key 前缀:用户进入直播间时间
+    private static final String USER_ENTRY_TIME_KEY = "live:user:entry:time:%s:%s"; // liveId:userId
 
     // 直播间在线用户缓存
 //    private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
@@ -114,6 +122,11 @@ public class WebSocketServer {
 
             LiveWatchUser liveWatchUserVO = liveWatchUserService.join(fsUser,liveId, userId, location);
             room.put(userId, session);
+            
+            // 存储用户进入直播间的时间到 Redis(用于计算在线时长)
+            String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
+            redisCache.setCacheObject(entryTimeKey, System.currentTimeMillis(), 24, TimeUnit.HOURS);
+            
             // 直播间浏览量 +1
             redisCache.incr(PAGE_VIEWS_KEY + liveId, 1);
 
@@ -161,6 +174,18 @@ public class WebSocketServer {
             }
 
             LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
+            // 如果用户连上了 socket,并且公司ID和销售ID大于0,更新 LiveWatchLog 的 logType
+
+            if ((companyId > 0 && companyUserId > 0) || (liveUserFirstEntry != null && liveUserFirstEntry.getCompanyId() > 0 && liveUserFirstEntry.getCompanyUserId() > 0 )) {
+                // 获取当前直播/回放状态
+                Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
+                Integer currentLiveFlag = flagMap.get("liveFlag");
+
+                // 如果当前是直播状态(liveFlag = 1),更新 logType
+                if (currentLiveFlag != null && currentLiveFlag == 1) {
+                    updateLiveWatchLogTypeOnConnect(liveId, userId, companyId, companyUserId);
+                }
+            }
             if (liveUserFirstEntry != null) {
                 // 处理第一次自己进入,第二次扫码销售进入
                 if (liveUserFirstEntry.getCompanyUserId() == -1L && companyUserId != -1L) {
@@ -205,6 +230,15 @@ public class WebSocketServer {
     @OnClose
     public void onClose(Session session) {
         Map<String, Object> userProperties = session.getUserProperties();
+        // 获取公司ID和销售ID
+        long companyId = -1L;
+        long companyUserId = -1L;
+        if (!Objects.isNull(userProperties.get("companyId"))) {
+            companyId = (long) userProperties.get("companyId");
+        }
+        if (!Objects.isNull(userProperties.get("companyUserId"))) {
+            companyUserId = (long) userProperties.get("companyUserId");
+        }
 
         long liveId = (long) userProperties.get("liveId");
         long userId = (long) userProperties.get("userId");
@@ -217,6 +251,8 @@ public class WebSocketServer {
             if (Objects.isNull(fsUser)) {
                 throw new BaseException("用户信息错误");
             }
+            // 计算并更新用户在线时长
+            updateUserOnlineDuration(liveId, userId, companyId, companyUserId);
             room.remove(userId);
             if (room.isEmpty()) {
                 rooms.remove(liveId);
@@ -228,6 +264,7 @@ public class WebSocketServer {
             // 从在线用户Set中移除用户ID
             String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveId;
             redisCache.redisTemplate.opsForSet().remove(onlineUsersSetKey, String.valueOf(userId));
+
             LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
 
 
@@ -565,7 +602,12 @@ public class WebSocketServer {
         sendMsgVo.setData(String.valueOf(scoreAmount));
 
         if(Objects.isNull( session)) return;
-        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        // 使用带锁的sendMessage方法,保证线程安全
+        try {
+            sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        } catch (IOException e) {
+            log.error("发送积分消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
+        }
     }
 
     private void sendBlockMessage(Long liveId, Long userId) {
@@ -585,40 +627,56 @@ public class WebSocketServer {
         sendMsgVo.setData(null);
 
         if(Objects.isNull( session)) return;
-        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        // 使用带锁的sendMessage方法,保证线程安全
+        try {
+            sendMessage(session, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        } catch (IOException e) {
+            log.error("发送封禁消息失败: liveId={}, userId={}, error={}", liveId, userId, e.getMessage(), e);
+        }
     }
 
     /**
      * 广播消息
      * @param liveId   直播间ID
      * @param message  消息内容
+     * 优化:使用快照遍历,避免在遍历过程中修改集合
      */
     public void broadcastWebMessage(Long liveId, String message) {
         ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+        
+        if (room.isEmpty()) {
+            return;
+        }
 
-        // 普通用户房间:并行发送
-        room.forEach((k, v) -> {
-            if (v.isOpen()) {
-                sendWithRetry(v,message,1);
+        // 普通用户房间:并行发送(使用快照遍历,避免并发修改)
+        // ConcurrentHashMap 的 entrySet() 是弱一致性的,但为了更安全,我们显式创建快照
+        for (Map.Entry<Long, Session> entry : room.entrySet()) {
+            Session session = entry.getValue();
+            if (session != null && session.isOpen()) {
+                sendWithRetry(session, message, 1);
             }
-        });
+        }
     }
 
     /**
      * 广播消息
      * @param liveId   直播间ID
      * @param message  消息内容
+     * 优化:使用快照遍历,避免在遍历过程中修改集合
      */
     public void broadcastMessage(Long liveId, String message) {
         ConcurrentHashMap<Long, Session> room = getRoom(liveId);
         List<Session> adminRoom = getAdminRoom(liveId);
 
-        // 普通用户房间:并行发送
-        room.forEach((k, v) -> {
-            if (v.isOpen()) {
-                sendWithRetry(v,message,1);
+        // 普通用户房间:并行发送(使用快照遍历,避免并发修改)
+        if (!room.isEmpty()) {
+            for (Map.Entry<Long, Session> entry : room.entrySet()) {
+                Session session = entry.getValue();
+                if (session != null && session.isOpen()) {
+                    sendWithRetry(session, message, 1);
+                }
             }
-        });
+        }
 
         // admin房间:串行发送,使用单线程执行器
         if (!adminRoom.isEmpty()) {
@@ -696,23 +754,39 @@ public class WebSocketServer {
         }
     }
 
+
     /**
      * 定时清理无效会话(每分钟执行一次)
      * 检查心跳超时的会话并关闭
+     * 优化:使用快照遍历,避免在遍历过程中修改集合
      */
     @Scheduled(fixedRate = 60000) // 每分钟执行一次
     public void cleanInactiveSessions() {
         long currentTime = System.currentTimeMillis();
         int cleanedCount = 0;
 
-        // 遍历所有直播间
+        // 遍历所有直播间(使用快照,避免在遍历过程中被修改影响)
         for (Map.Entry<Long, ConcurrentHashMap<Long, Session>> roomEntry : rooms.entrySet()) {
             Long liveId = roomEntry.getKey();
             ConcurrentHashMap<Long, Session> room = roomEntry.getValue();
+            
+            // 如果房间为空,跳过
+            if (room.isEmpty()) {
+                continue;
+            }
 
-            // 检查普通用户会话
+            // 检查普通用户会话(使用快照遍历,避免并发修改异常)
             List<Long> toRemove = new ArrayList<>();
-            room.forEach((userId, session) -> {
+            // 创建快照,避免在遍历过程中修改原集合
+            for (Map.Entry<Long, Session> userEntry : room.entrySet()) {
+                Long userId = userEntry.getKey();
+                Session session = userEntry.getValue();
+                
+                if (session == null) {
+                    toRemove.add(userId);
+                    continue;
+                }
+                
                 Long lastHeartbeat = heartbeatCache.get(session.getId());
                 if (lastHeartbeat != null && (currentTime - lastHeartbeat) > HEARTBEAT_TIMEOUT) {
                     toRemove.add(userId);
@@ -720,16 +794,35 @@ public class WebSocketServer {
                         if (session.isOpen()) {
                             session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
                         }
+                        
+                        // 计算并更新用户在线时长(心跳超时断开连接)
+                        Map<String, Object> userProperties = session.getUserProperties();
+                        long companyId = -1L;
+                        long companyUserId = -1L;
+                        if (!Objects.isNull(userProperties.get("companyId"))) {
+                            companyId = (long) userProperties.get("companyId");
+                        }
+                        if (!Objects.isNull(userProperties.get("companyUserId"))) {
+                            companyUserId = (long) userProperties.get("companyUserId");
+                        }
+                        updateUserOnlineDuration(liveId, userId, companyId, companyUserId);
                     } catch (Exception e) {
                         log.error("关闭超时会话失败: sessionId={}, liveId={}, userId={}",
                                 session.getId(), liveId, userId, e);
                     }
                 }
-            });
+            }
 
             // 移除超时的会话
-            toRemove.forEach(room::remove);
-            cleanedCount += toRemove.size();
+            if (!toRemove.isEmpty()) {
+                String hashKey = String.format(LiveKeysConstant.LIVE_WATCH_USERS, liveId);
+                for (Long userId : toRemove) {
+                    room.remove(userId);
+                    // 从 Redis hash 中删除无效用户
+                    redisCache.hashDelete(hashKey, String.valueOf(userId));
+                }
+                cleanedCount += toRemove.size();
+            }
         }
 
         // 检查admin房间
@@ -770,14 +863,22 @@ public class WebSocketServer {
      * 广播点赞消息
      * @param liveId   直播间ID
      * @param message  消息内容
+     * 优化:使用快照遍历,避免在遍历过程中修改集合
      */
     public void broadcastLikeMessage(Long liveId, String message) {
         ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        room.forEach((k, v) -> {
-            if (v.isOpen()) {
-                sendWithRetry(v,message,1);
+        
+        if (room.isEmpty()) {
+            return;
+        }
+        
+        // 使用快照遍历,避免并发修改
+        for (Map.Entry<Long, Session> entry : room.entrySet()) {
+            Session session = entry.getValue();
+            if (session != null && session.isOpen()) {
+                sendWithRetry(session, message, 1);
             }
-        });
+        }
     }
 
     private void sendWithRetry(Session session, String message, int maxRetries) {
@@ -919,5 +1020,165 @@ public class WebSocketServer {
         redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
     }
 
+    /**
+     * 计算并更新用户在线时长
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @param companyId 公司ID
+     * @param companyUserId 销售ID
+     */
+    private void updateUserOnlineDuration(Long liveId, Long userId, Long companyId, Long companyUserId) {
+        try {
+            // 从 Redis 获取用户进入时间
+            String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
+            Long entryTime = redisCache.getCacheObject(entryTimeKey);
+            
+            if (entryTime == null) {
+                // 如果没有进入时间记录,可能是旧数据,跳过
+                return;
+            }
+            
+            long currentTimeMillis = System.currentTimeMillis();
+            Date now = new Date();
+            
+            // 计算在线时长(秒)
+            long durationSeconds = (currentTimeMillis - entryTime) / 1000;
+            
+            if (durationSeconds <= 0) {
+                return;
+            }
+            
+            // 获取当前直播/回放状态
+            Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
+            Integer currentLiveFlag = flagMap.get("liveFlag");
+            Integer currentReplayFlag = flagMap.get("replayFlag");
+            
+            // 查询用户记录
+            LiveWatchUserEntry liveWatchUser = liveWatchUserService.selectLiveWatchAndCompanyUserByFlag(
+                    liveId, userId, currentLiveFlag, currentReplayFlag);
+            
+            if (liveWatchUser != null) {
+                // 累加在线时长
+                Long onlineSeconds = liveWatchUser.getOnlineSeconds();
+                if (onlineSeconds == null) {
+                    onlineSeconds = 0L;
+                }
+                liveWatchUser.setOnlineSeconds(onlineSeconds + durationSeconds);
+                liveWatchUser.setUpdateTime(now);
+                
+                // 更新数据库
+                liveWatchUserService.updateLiveWatchUserEntry(liveWatchUser);
+                
+                // 如果 LiveWatchUserEntry 存在,并且当前是直播状态(liveFlag = 1),更新 LiveWatchLog
+                if (currentLiveFlag != null && currentLiveFlag == 1 
+                        && liveWatchUser.getCompanyId() != null && liveWatchUser.getCompanyId() > 0
+                        && liveWatchUser.getCompanyUserId() != null && liveWatchUser.getCompanyUserId() > 0) {
+                    updateLiveWatchLogTypeByDuration(liveId, userId, 
+                            liveWatchUser.getCompanyId(), liveWatchUser.getCompanyUserId(), 
+                            liveWatchUser.getOnlineSeconds());
+                }
+            }
+            
+            // 删除 Redis 中的进入时间记录
+            redisCache.deleteObject(entryTimeKey);
+        } catch (Exception e) {
+            log.error("更新用户在线时长异常:liveId={}, userId={}, error={}", 
+                    liveId, userId, e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 在连接时更新 LiveWatchLog 的 logType
+     * 如果 logType 类型不是 2,修改 logType 类型为 1(看课中)
+     */
+    private void updateLiveWatchLogTypeOnConnect(Long liveId, Long userId, Long companyId, Long companyUserId) {
+        try {
+            LiveWatchLog queryLog = new LiveWatchLog();
+            queryLog.setLiveId(liveId);
+            queryLog.setUserId(userId);
+            queryLog.setCompanyId(companyId);
+            queryLog.setCompanyUserId(companyUserId);
+            
+            List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
+            if (logs != null && !logs.isEmpty()) {
+                for (LiveWatchLog log : logs) {
+                    // 如果 logType 不是 2(完课),则更新为 1(看课中)
+                    if (log.getLogType() == null || log.getLogType() != 2) {
+                        log.setLogType(1);
+                        liveWatchLogService.updateLiveWatchLog(log);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("更新 LiveWatchLog logType 异常(连接时):liveId={}, userId={}, error={}", 
+                    liveId, userId, e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 根据在线时长更新 LiveWatchLog 的 logType
+     * @param liveId 直播间ID
+     * @param userId 用户ID
+     * @param companyId 公司ID
+     * @param companyUserId 销售ID
+     * @param onlineSeconds 在线时长(秒)
+     */
+    private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long companyId, 
+                                                   Long companyUserId, Long onlineSeconds) {
+        try {
+            // 获取直播视频总时长(videoType = 1 的视频)
+            List<LiveVideo> videos = liveVideoService.listByLiveId(liveId, 1);
+            long totalVideoDuration = 0L;
+            if (videos != null && !videos.isEmpty()) {
+                totalVideoDuration = videos.stream()
+                        .filter(v -> v.getDuration() != null)
+                        .mapToLong(LiveVideo::getDuration)
+                        .sum();
+            }
+            
+            // 查询 LiveWatchLog
+            LiveWatchLog queryLog = new LiveWatchLog();
+            queryLog.setLiveId(liveId);
+            queryLog.setUserId(userId);
+            queryLog.setCompanyId(companyId);
+            queryLog.setCompanyUserId(companyUserId);
+            
+            List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
+            if (logs == null || logs.isEmpty()) {
+                return;
+            }
+            
+            for (LiveWatchLog log : logs) {
+                boolean needUpdate = false;
+                Integer newLogType = log.getLogType();
+                
+                // ① 如果在线时长 <= 3分钟,修改 logType 为 4(看课中断)
+                if (onlineSeconds <= 180) { // 3分钟 = 180秒
+                    newLogType = 4;
+                    needUpdate = true;
+                }
+                // ③ 如果直播视频 >= 40分钟,在线时长 >= 30分钟,logType 设置为 2(完课)
+                else if (totalVideoDuration >= 2400 && onlineSeconds >= 1800) { // 40分钟 = 2400秒,30分钟 = 1800秒
+                    newLogType = 2;
+                    needUpdate = true;
+                }
+                // 如果直播视频 >= 20分钟且 < 40分钟,在线时长 >= 20分钟,logType 设置为 2(完课)
+                else if (totalVideoDuration >= 1200 && totalVideoDuration < 2400 && onlineSeconds >= 1200) { // 20分钟 = 1200秒
+                    newLogType = 2;
+                    needUpdate = true;
+                }
+                
+                // 如果 logType 已经是 2(完课),不再更新
+                if (needUpdate && (log.getLogType() == null || log.getLogType() != 2)) {
+                    log.setLogType(newLogType);
+                    liveWatchLogService.updateLiveWatchLog(log);
+                }
+            }
+        } catch (Exception e) {
+            log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}", 
+                    liveId, userId, e.getMessage(), e);
+        }
+    }
+
 }
 

+ 12 - 0
fs-service/src/main/java/com/fs/hisStore/mapper/FsStoreProductAttrValueScrmMapper.java

@@ -147,4 +147,16 @@ public interface FsStoreProductAttrValueScrmMapper
     void updateFsStoreProductAttrValuePrice(List<Long> ids, double v);
 
     List<FsStoreProductAttrValueScrm> getFsStoreProductAttrValueListInProductId(List<Long> productIds);
+
+    @Update({"<script> " +
+            " UPDATE fs_store_product_attr_value_scrm" +
+            " SET stock = stock + CAST(#{totalNum} AS SIGNED)" +
+            " WHERE product_id = #{productId}" +
+            " AND bar_code IN",
+            "<foreach collection='barCodeList' item='barCode' open='(' separator=',' close=')'>" +
+            "#{barCode}" +
+            "</foreach>" +
+            "</script>"
+    })
+    void incStock(@Param("productId") Long productId,@Param("barCodeList") List<String> barCodeList,@Param("totalNum") String totalNum);
 }

+ 10 - 0
fs-service/src/main/java/com/fs/live/domain/LiveWatchLog.java

@@ -76,4 +76,14 @@ public class LiveWatchLog extends BaseEntity{
      */
     private String corpId;
 
+    /**
+     * 直播购买
+     */
+    private Integer liveBuy;
+
+    /**
+     * 回放购买
+     */
+    private Integer replayBuy;
+
 }

+ 6 - 0
fs-service/src/main/java/com/fs/live/mapper/LiveWatchUserMapper.java

@@ -3,6 +3,7 @@ package com.fs.live.mapper;
 
 import com.fs.live.domain.LiveWatchUser;
 import com.fs.live.vo.LiveDashBoardDataVo;
+import com.fs.live.vo.LiveWatchUserEntry;
 import com.fs.live.vo.LiveWatchUserStatistics;
 import com.fs.live.vo.LiveWatchUserVO;
 import org.apache.ibatis.annotations.Param;
@@ -143,4 +144,9 @@ public interface LiveWatchUserMapper {
 
     @Select("select * from live_watch_user where live_id = #{liveId}")
     List<LiveWatchUser> selectLiveWatchUserListByLiveId(@Param("liveId") Long liveId);
+
+    @Select("select lufe.company_id,lufe.company_user_id,lwu.* from live_watch_user lwu" +
+            " left join live_user_first_entry lufe on lwu.live_id = lufe.live_id and lwu.user_id = lufe.user_id" +
+            " where lwu.live_id = #{liveId} and lwu.user_id = #{userId} and lwu.live_flag = #{liveFlag} and lwu.replay_flag = #{replayFlag} limit 1 ")
+    LiveWatchUserEntry selectLiveWatchAndCompanyUserByFlag(@Param("liveId") Long liveId,@Param("userId") Long userId,@Param("liveFlag") Integer liveFlag,@Param("replayFlag") Integer replayFlag);
 }

+ 5 - 0
fs-service/src/main/java/com/fs/live/service/ILiveWatchUserService.java

@@ -5,6 +5,7 @@ import com.fs.common.core.domain.R;
 import com.fs.hisStore.domain.FsUserScrm;
 import com.fs.live.domain.LiveWatchUser;
 import com.fs.live.param.LiveIsAddKfParam;
+import com.fs.live.vo.LiveWatchUserEntry;
 import com.fs.live.vo.LiveWatchUserVO;
 
 import java.util.Date;
@@ -129,4 +130,8 @@ public interface ILiveWatchUserService {
     LiveWatchUser selectLiveWatchUserByFlag(Long liveId, Long userId, Integer liveFlag, Integer replayFlag);
 
     R liveIsAddKf(LiveIsAddKfParam param);
+
+    LiveWatchUserEntry selectLiveWatchAndCompanyUserByFlag(Long liveId, Long userId, Integer liveFlag, Integer replayFlag);
+
+    void updateLiveWatchUserEntry(LiveWatchUserEntry liveWatchUser);
 }

+ 62 - 6
fs-service/src/main/java/com/fs/live/service/impl/LiveOrderServiceImpl.java

@@ -749,6 +749,7 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
             order.setIsPay("1");
             baseMapper.updateLiveOrder(order);
             try {
+                this.updateLiveWatchLog(order);
                 this.createOmsOrderCall(order);
             } catch (Exception e) {
                 log.error("推送erp失败:{}",e.getMessage());
@@ -767,6 +768,38 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         }
         return "SUCCESS";
     }
+    @Autowired
+    private ILiveWatchUserService liveWatchUserService;
+    @Autowired
+    private ILiveWatchLogService liveWatchLogService;
+
+    private void updateLiveWatchLog(LiveOrder order) {
+        if (order.getCompanyId() != null && order.getCompanyUserId() != null && order.getCompanyId() > 0 && order.getCompanyUserId() > 0) {
+            Map<String, Integer> liveFlagWithCache = liveWatchUserService.getLiveFlagWithCache(order.getLiveId());
+            if (liveFlagWithCache != null && liveFlagWithCache.containsKey("liveFlag") && 1 == liveFlagWithCache.get("liveFlag")) {
+                try {
+                    LiveWatchLog queryLog = new LiveWatchLog();
+                    queryLog.setLiveId(order.getLiveId());
+                    queryLog.setUserId(Long.valueOf(order.getUserId()));
+                    queryLog.setCompanyId(order.getCompanyId());
+                    queryLog.setCompanyUserId(order.getCompanyUserId());
+
+                    List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
+                    if (logs != null && !logs.isEmpty()) {
+                        for (LiveWatchLog log : logs) {
+                            if (log.getLogType() == null || log.getLogType() != 2) {
+                                log.setLiveBuy(1);
+                                liveWatchLogService.updateLiveWatchLog(log);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("更新 updateLiveWatchLog LiveWatchLog logType 异常(连接时):liveId={}, userId={}, error={}",
+                            order.getLiveId(), order.getUserId(), e.getMessage(), e);
+                }
+            }
+        }
+    }
 
     @Override
     @Transactional(rollbackFor = Throwable.class,propagation = Propagation.REQUIRED)
@@ -1916,12 +1949,23 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
         FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
         LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
         if(goods == null) return R.error("当前商品不存在");
+        FsStoreProductAttrValueScrm attrValue = null;
+        if (!Objects.isNull(liveOrder.getAttrValueId())) {
+            attrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueById(liveOrder.getAttrValueId());
+        }
+        if (attrValue != null) {
+            attrValue.setStock(attrValue.getStock() - Integer.parseInt(liveOrder.getTotalNum()));
+            attrValue.setSales(attrValue.getSales() + Integer.parseInt(liveOrder.getTotalNum()));
+            fsStoreProductAttrValueMapper.updateFsStoreProductAttrValue(attrValue);
+        }
 
         // 更改店铺库存
         fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
+        fsStoreProduct.setSales(fsStoreProduct.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
         fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
         // 更新直播间库存
         goods.setStock(goods.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
+        goods.setSales(goods.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
         liveGoodsMapper.updateLiveGoods(goods);
 
         //判断是否是三种特定产品
@@ -1979,7 +2023,7 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
                 dto.setImage(fsStoreProduct.getImage());
                 dto.setSku(String.valueOf(fsStoreProduct.getStock()));
                 if (StringUtils.isEmpty(fsStoreProduct.getBarCode())) {
-                    FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attrValue -> StringUtils.isNotEmpty(attrValue.getBarCode())).findFirst().orElse(null);
+                    FsStoreProductAttrValueScrm fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueByProductId(fsStoreProduct.getProductId()).stream().filter(attr -> StringUtils.isNotEmpty(attr.getBarCode())).findFirst().orElse(null);
                     if (fsStoreProductAttrValue != null) {
                         dto.setBarCode(fsStoreProductAttrValue.getBarCode());
                     }
@@ -3516,12 +3560,11 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
             attrValue.setStock(attrValue.getStock() - Integer.parseInt(liveOrder.getTotalNum()));
             attrValue.setSales(attrValue.getSales() + Integer.parseInt(liveOrder.getTotalNum()));
             fsStoreProductAttrValueMapper.updateFsStoreProductAttrValue(attrValue);
-        } else {
-            // 更改店铺库存
-            fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
-            fsStoreProduct.setSales(fsStoreProduct.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
-            fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
         }
+        // 更改店铺库存
+        fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
+        fsStoreProduct.setSales(fsStoreProduct.getSales()+Integer.parseInt(liveOrder.getTotalNum()));
+        fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
 
         // 更新直播间库存
         goods.setStock(goods.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
@@ -3771,6 +3814,19 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
             FsStoreProductScrm fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
             LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
             fsStoreProduct.setStock(fsStoreProduct.getStock()+Integer.parseInt(liveOrder.getTotalNum()));
+            List<LiveOrderItem> liveOrderItems = liveOrderItemMapper.selectCheckedByOrderId(order.getOrderId());
+            List<String> barCodeList = new ArrayList<>();
+            //更新规格库存
+            for (LiveOrderItem item : liveOrderItems) {
+                FsStoreProduct cartDTO = JSONUtil.toBean(item.getJsonInfo(), FsStoreProduct.class);
+                if (StringUtils.isNotEmpty(cartDTO.getBarCode())) {
+                    barCodeList.add(cartDTO.getBarCode());
+                }
+            }
+            if (!barCodeList.isEmpty()) {
+                attrValueScrmMapper.incStock(fsStoreProduct.getProductId(), barCodeList, liveOrder.getTotalNum());
+            }
+
             // 更新商品库存
             fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
             goods.setStock(goods.getStock()+Long.parseLong(liveOrder.getTotalNum()));

+ 13 - 8
fs-service/src/main/java/com/fs/live/service/impl/LiveWatchUserServiceImpl.java

@@ -28,6 +28,7 @@ import com.fs.live.mapper.LiveMapper;
 import com.fs.live.mapper.LiveVideoMapper;
 import com.fs.live.param.LiveIsAddKfParam;
 import com.fs.live.service.ILiveWatchUserService;
+import com.fs.live.vo.LiveWatchUserEntry;
 import com.fs.live.vo.LiveWatchUserStatistics;
 import com.fs.live.vo.LiveWatchUserVO;
 import com.fs.qw.domain.QwExternalContact;
@@ -339,14 +340,6 @@ public class LiveWatchUserServiceImpl implements ILiveWatchUserService {
 
         // 使用唯一索引查询:live_id, user_id, live_flag, replay_flag
         LiveWatchUser liveWatchUser = baseMapper.selectByUniqueIndex(liveId, userId, liveFlag, replayFlag);
-        // 设置在线时长
-        try {
-            Long onlineSeconds = liveWatchUser.getOnlineSeconds();
-            if(onlineSeconds == null) onlineSeconds = 0L;
-            liveWatchUser.setOnlineSeconds(onlineSeconds + (System.currentTimeMillis() - liveWatchUser.getUpdateTime().getTime()) / 1000);
-        } catch (Exception e) {
-            log.error("设置在线时长异常:{}", e.getMessage());
-        }
         liveWatchUser.setUpdateTime(DateUtils.getNowDate());
         liveWatchUser.setOnline(1);
         baseMapper.updateLiveWatchUser(liveWatchUser);
@@ -570,6 +563,18 @@ public class LiveWatchUserServiceImpl implements ILiveWatchUserService {
 
     }
 
+    @Override
+    public LiveWatchUserEntry selectLiveWatchAndCompanyUserByFlag(Long liveId, Long userId, Integer liveFlag, Integer replayFlag) {
+        return baseMapper.selectLiveWatchAndCompanyUserByFlag(liveId,userId,liveFlag,replayFlag);
+    }
+
+    @Override
+    public void updateLiveWatchUserEntry(LiveWatchUserEntry liveWatchUser) {
+        LiveWatchUser updateEntity = new LiveWatchUser();
+        BeanUtil.copyProperties(updateEntity, liveWatchUser);
+        baseMapper.updateLiveWatchUser(updateEntity);
+    }
+
     /**
      * 处理发送群聊逻辑
      * @param param

+ 72 - 0
fs-service/src/main/java/com/fs/live/vo/LiveWatchUserEntry.java

@@ -0,0 +1,72 @@
+package com.fs.live.vo;
+
+
+import com.fs.common.annotation.Excel;
+import com.fs.common.core.domain.BaseEntity;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * 直播间观看用户对象 live_watch_user
+ *
+ * @author fs
+ * @date 2025-01-18
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class LiveWatchUserEntry extends BaseEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private Long id;
+
+    /** 直播ID */
+    @Excel(name = "直播ID")
+    private Long liveId;
+
+    /** 用户ID */
+    @Excel(name = "用户ID")
+    private Long userId;
+
+    @Excel(name = "用户头像")
+    private String avatar;
+
+    /** 消息状态;0正常1禁言 */
+    @Excel(name = "消息状态;0正常1禁言")
+    private Integer msgStatus;
+
+    /** 在线状态;0在线1离线 */
+    @Excel(name = "在线状态;0在线1离线")
+    private Integer online = 0;
+    /** 全局用户自见 */
+    private Integer globalVisible = 0;
+    /** 用户自见 */
+    private Integer singleVisible = 0;
+
+    private Long onlineSeconds;
+
+    /** 用户名字 */
+
+    private String nickName;
+    private String tabName;
+
+    /** 直播进入标记:0-否 1-是 */
+    @Excel(name = "直播进入标记")
+    private Integer liveFlag = 0;
+
+    /** 回放进入标记:0-否 1-是 */
+    @Excel(name = "回放进入标记")
+    private Integer replayFlag = 0;
+
+    /** 用户所在位置 */
+    @Excel(name = "用户所在位置")
+    private String location;
+
+
+    private Integer pageNum;
+    private Integer pageSize;
+
+    private Long companyId;
+    private Long companyUserId;
+
+}

+ 16 - 1
fs-service/src/main/resources/mapper/live/LiveWatchLogMapper.xml

@@ -23,10 +23,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         <result property="qwUserId"    column="qw_user_id"    />
         <result property="watchType"    column="watch_type"    />
         <result property="corpId"    column="corp_id"    />
+        <result property="liveBuy"    column="live_buy"    />
+        <result property="replayBuy"    column="replay_buy"    />
     </resultMap>
 
     <sql id="selectLiveWatchLogVo">
-        select log_id, user_id, live_id, log_type, create_time, update_time, external_contact_id, company_user_id, company_id, finish_time, create_by, sop_create_time, send_app_id, log_source, qw_user_id,watch_type,corp_id from live_watch_log
+        select log_id, user_id, live_id, log_type, create_time, update_time, external_contact_id, company_user_id, company_id, finish_time, create_by, sop_create_time,live_buy,replay_buy,
+               send_app_id, log_source, qw_user_id,watch_type,corp_id from live_watch_log
     </sql>
 
     <select id="selectLiveWatchLogList" parameterType="LiveWatchLog" resultMap="LiveWatchLogResult">
@@ -45,6 +48,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="qwUserId != null  and qwUserId != ''"> and qw_user_id = #{qwUserId}</if>
             <if test="watchType != null">and watch_type = #{watchType} </if>
             <if test="corpId != null">and corp_id = #{corpId} </if>
+            <if test="liveBuy != null">and live_buy = #{liveBuy} </if>
+            <if test="replayBuy != null">and replay_buy = #{replayBuy} </if>
         </where>
     </select>
 
@@ -72,6 +77,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="qwUserId != null">qw_user_id,</if>
             <if test="watchType != null">watch_type,</if>
             <if test="corpId != null">corp_id,</if>
+            <if test="liveBuy != null">live_buy,</if>
+            <if test="replayBuy != null">replay_buy,</if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="userId != null">#{userId},</if>
@@ -90,6 +97,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="qwUserId != null">#{qwUserId},</if>
             <if test="watchType != null">#{watchType},</if>
             <if test="corpId != null">#{corpId},</if>
+            <if test="liveBuy != null">#{liveBuy},</if>
+            <if test="replayBuy != null">#{replayBuy},</if>
         </trim>
     </insert>
 
@@ -112,6 +121,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="qwUserId != null">qw_user_id = #{qwUserId},</if>
             <if test="watchType != null">watch_type = #{watchType},</if>
             <if test="corpId != null">corp_id = #{corpId},</if>
+            <if test="liveBuy != null">live_buy = #{liveBuy},</if>
+            <if test="replayBuy != null">replay_buy = #{replayBuy},</if>
         </trim>
         where log_id = #{logId}
     </update>
@@ -149,6 +160,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
                         <if test="item.qwUserId != null">qw_user_id,</if>
                         <if test="item.watchType != null">watch_type,</if>
                         <if test="item.corpId != null">corp_id,</if>
+                        <if test="item.liveBuy != null">live_buy,</if>
+                        <if test="item.replayBuy != null">replay_buy,</if>
                     </if>
                 </foreach>
             </if>
@@ -172,6 +185,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
                 <if test="item.qwUserId != null">#{item.qwUserId},</if>
                 <if test="item.watchType != null">#{item.watchType},</if>
                 <if test="item.corpId != null">#{item.corpId},</if>
+                <if test="item.liveBuy != null">#{item.liveBuy},</if>
+                <if test="item.replayBuy != null">#{item.replayBuy},</if>
             </trim>)
             </foreach>
         </trim>