Pārlūkot izejas kodu

1、调整是否购物车隐藏暂时发送消息
2、直播留存记录

yys 5 dienas atpakaļ
vecāks
revīzija
6c565f9a85

+ 3 - 0
fs-common/src/main/java/com/fs/common/constant/LiveKeysConstant.java

@@ -42,4 +42,7 @@ public class LiveKeysConstant {
 
     public static final String LIVE_ROOM_PASSWORD_CACHE = "live:room:password:%s";
 
+    /** 直播 WebSocket 跨服务广播频道(admin 等服务发布,live-app 订阅后推送给 App) */
+    public static final String LIVE_WS_BROADCAST_CHANNEL = "live:ws:broadcast";
+
 }

+ 7 - 0
fs-common/src/main/java/com/fs/common/core/redis/RedisCache.java

@@ -541,4 +541,11 @@ public class RedisCache
         redisTemplate.opsForSet().add(key, value);
     }
 
+    /**
+     * 发布 Redis 频道消息
+     */
+    public void publish(final String channel, final Object message) {
+        redisTemplate.convertAndSend(channel, message);
+    }
+
 }

+ 26 - 0
fs-live-app/src/main/java/com/fs/live/redis/LiveRedisPubSubConfig.java

@@ -0,0 +1,26 @@
+package com.fs.live.redis;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+
+/**
+ * ??? WebSocket Redis ????????????
+ */
+@Configuration
+public class LiveRedisPubSubConfig {
+
+    @Bean
+    public RedisMessageListenerContainer liveWsRedisMessageListenerContainer(
+            RedisConnectionFactory redisConnectionFactory,
+            @Lazy LiveWsBroadcastSubscriber liveWsBroadcastSubscriber) {
+        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+        container.setConnectionFactory(redisConnectionFactory);
+        container.addMessageListener(liveWsBroadcastSubscriber,
+                new ChannelTopic(LiveWsBroadcastSubscriber.channel()));
+        return container;
+    }
+}

+ 67 - 0
fs-live-app/src/main/java/com/fs/live/redis/LiveWsBroadcastSubscriber.java

@@ -0,0 +1,67 @@
+package com.fs.live.redis;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fs.common.constant.LiveKeysConstant;
+import com.fs.common.utils.StringUtils;
+import com.fs.common.utils.spring.SpringUtils;
+import com.fs.live.websocket.bean.SendMsgVo;
+import com.fs.live.websocket.service.WebSocketServer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 订阅 admin 等服务发布的直播 WebSocket 广播,推送给 App 端
+ */
+@Slf4j
+@Component
+public class LiveWsBroadcastSubscriber implements MessageListener {
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        if (message == null || message.getBody() == null) {
+            return;
+        }
+        String body = new String(message.getBody(), StandardCharsets.UTF_8);
+        if (StringUtils.isEmpty(body)) {
+            return;
+        }
+        try {
+            JSONObject payload = JSONObject.parseObject(body);
+            Long liveId = payload.getLong("liveId");
+            String cmd = payload.getString("cmd");
+            if (liveId == null || StringUtils.isEmpty(cmd)) {
+                log.warn("[LiveWsBroadcast] 忽略无效消息: {}", body);
+                return;
+            }
+            SendMsgVo sendMsgVo = new SendMsgVo();
+            sendMsgVo.setLiveId(liveId);
+            sendMsgVo.setCmd(cmd);
+            if (payload.containsKey("status")) {
+                sendMsgVo.setStatus(payload.getInteger("status"));
+            }
+            if (payload.containsKey("userId")) {
+                sendMsgVo.setUserId(payload.getLong("userId"));
+            }
+            if (payload.containsKey("msg")) {
+                sendMsgVo.setMsg(payload.getString("msg"));
+            }
+            if (payload.containsKey("data")) {
+                sendMsgVo.setData(payload.getString("data"));
+            }
+            WebSocketServer webSocketServer = SpringUtils.getBean(WebSocketServer.class);
+            webSocketServer.broadcastLiveCmd(sendMsgVo);
+            log.info("[LiveWsBroadcast] 已推送指令, liveId={}, cmd={}, status={}",
+                    liveId, cmd, sendMsgVo.getStatus());
+        } catch (Exception e) {
+            log.error("[LiveWsBroadcast] 处理广播消息失败: {}", body, e);
+        }
+    }
+
+    public static String channel() {
+        return LiveKeysConstant.LIVE_WS_BROADCAST_CHANNEL;
+    }
+}

+ 21 - 1
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -487,6 +487,9 @@ public class WebSocketServer {
                                 sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
                                 return;
                             }
+                            if (!liveWatchUser.isEmpty() && liveWatchUser.get(0).getSingleVisible() != null) {
+                                liveMsg.setSingleVisible(liveWatchUser.get(0).getSingleVisible());
+                            }
                         } catch (Exception e) {
                             log.error("[WebSocket-sendMsg] 检查禁言状态失败, liveId={}, userId={}, error={}",
                                     liveId, msg.getUserId(), e.getMessage(), e);
@@ -596,10 +599,16 @@ public class WebSocketServer {
                     break;
                 case "showCart":
                     msg.setOn(true);
+                    if (msg.getStatus() != null) {
+                        liveService.updateShowCartWithoutBroadcast(liveId, msg.getStatus());
+                    }
                     enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
                 case "singleVisible":
-                    liveWatchUserService.updateSingleVisible(liveId, msg.getStatus(),msg.getUserId());
+                    msg.setOn(true);
+                    if (msg.getStatus() != null && msg.getUserId() != null) {
+                        liveWatchUserService.updateSingleVisible(liveId, msg.getStatus(), msg.getUserId());
+                    }
                     // 管理员消息插队
                     enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
@@ -1149,6 +1158,17 @@ public class WebSocketServer {
         }
     }
 
+    /**
+     * 广播中控台指令到 App 端(管理员消息插队)
+     */
+    public void broadcastLiveCmd(SendMsgVo msg) {
+        if (msg == null || msg.getLiveId() == null || StringUtils.isEmpty(msg.getCmd())) {
+            return;
+        }
+        msg.setOn(true);
+        enqueueMessage(msg.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)), true);
+    }
+
     /**
      * 广播消息
      * @param liveId   直播间ID

+ 0 - 6
fs-service/src/main/java/com/fs/live/service/ILiveConsoleOpLogService.java

@@ -4,7 +4,6 @@ import com.fs.live.domain.LiveConsoleOpLog;
 import com.fs.live.domain.LiveConsoleOpLogUser;
 import com.fs.live.domain.LiveCoupon;
 import com.fs.live.vo.LiveConsoleOpLogRecordVo;
-import com.fs.live.vo.LiveUserRewardRecordsVo;
 
 import java.util.List;
 
@@ -41,9 +40,4 @@ public interface ILiveConsoleOpLogService {
      * 查询用户在指定直播间的留存记录(含领取/结束状态)
      */
     List<LiveConsoleOpLogRecordVo> listUserOpLogRecords(Long liveId, Long userId);
-
-    /**
-     * 查询用户在指定直播间的留存记录(含时长信息)
-     */
-    LiveUserRewardRecordsVo getUserRewardRecords(Long liveId, Long userId);
 }

+ 5 - 0
fs-service/src/main/java/com/fs/live/service/ILiveService.java

@@ -213,6 +213,11 @@ public interface ILiveService
 
     R updateShowCart(Long liveId, Integer showCart);
 
+    /**
+     * 仅更新购物车显示状态(不触发 Redis 广播,供 WebSocket 中控台指令使用)
+     */
+    void updateShowCartWithoutBroadcast(Long liveId, Integer showCart);
+
     String getGotoWxAppLiveLink(String linkStr, String appid);
 
     R liveListAll(PageRequest pageRequest);

+ 0 - 43
fs-service/src/main/java/com/fs/live/service/impl/LiveConsoleOpLogServiceImpl.java

@@ -5,7 +5,6 @@ import com.fs.common.utils.DateUtils;
 import com.fs.common.utils.DictUtils;
 import com.fs.common.utils.SecurityUtils;
 import com.fs.common.utils.StringUtils;
-import com.fs.live.domain.Live;
 import com.fs.live.domain.LiveConsoleOpLog;
 import com.fs.live.domain.LiveConsoleOpLogUser;
 import com.fs.live.domain.LiveCoupon;
@@ -20,10 +19,7 @@ import com.fs.live.service.ILiveConsoleOpLogService;
 import com.fs.live.service.ILiveCouponIssueService;
 import com.fs.live.service.ILiveLotteryConfService;
 import com.fs.live.service.ILiveRedConfService;
-import com.fs.live.service.ILiveService;
-import com.fs.live.service.ILiveWatchUserService;
 import com.fs.live.vo.LiveConsoleOpLogRecordVo;
-import com.fs.live.vo.LiveUserRewardRecordsVo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
@@ -37,8 +33,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.time.LocalDateTime;
-import java.time.temporal.ChronoUnit;
 
 /**
  * 直播中控台操作留存 Service 实现
@@ -69,12 +63,6 @@ public class LiveConsoleOpLogServiceImpl implements ILiveConsoleOpLogService {
     @Autowired
     private LiveCouponMapper liveCouponMapper;
 
-    @Autowired
-    private ILiveService liveService;
-
-    @Autowired
-    private ILiveWatchUserService liveWatchUserService;
-
     @Override
     public List<LiveConsoleOpLog> selectLiveConsoleOpLogList(LiveConsoleOpLog liveConsoleOpLog) {
         return liveConsoleOpLogMapper.selectLiveConsoleOpLogList(liveConsoleOpLog);
@@ -222,37 +210,6 @@ public class LiveConsoleOpLogServiceImpl implements ILiveConsoleOpLogService {
         return result;
     }
 
-    @Override
-    public LiveUserRewardRecordsVo getUserRewardRecords(Long liveId, Long userId) {
-        LiveUserRewardRecordsVo vo = new LiveUserRewardRecordsVo();
-        vo.setRecords(listUserOpLogRecords(liveId, userId));
-        vo.setLiveDuration(resolveLiveDuration(liveId));
-        vo.setWatchDuration(liveWatchUserService.getUserWatchDuration(liveId, userId));
-        return vo;
-    }
-
-    private Long resolveLiveDuration(Long liveId) {
-        if (liveId == null) {
-            return 0L;
-        }
-        Live live = liveService.selectLiveByLiveId(liveId);
-        if (live == null) {
-            return 0L;
-        }
-        if (live.getDuration() != null && live.getDuration() > 0) {
-            return live.getDuration();
-        }
-        if (live.getVideoDuration() != null && live.getVideoDuration() > 0) {
-            return live.getVideoDuration();
-        }
-        if (live.getStartTime() != null) {
-            LocalDateTime endTime = live.getFinishTime() != null ? live.getFinishTime() : LocalDateTime.now();
-            long seconds = ChronoUnit.SECONDS.between(live.getStartTime(), endTime);
-            return Math.max(seconds, 0L);
-        }
-        return 0L;
-    }
-
     /**
      * 状态优先级:已领取 > 已结束 > 待领取
      */

+ 22 - 1
fs-service/src/main/java/com/fs/live/service/impl/LiveServiceImpl.java

@@ -605,9 +605,30 @@ public class LiveServiceImpl implements ILiveService
         if (liveId == null || showCart == null) {
             return R.error("参数错误");
         }
+        updateShowCartWithoutBroadcast(liveId, showCart);
+        publishShowCartWsMessage(liveId, showCart);
+        return R.ok();
+    }
+
+    @Override
+    public void updateShowCartWithoutBroadcast(Long liveId, Integer showCart) {
+        if (liveId == null || showCart == null) {
+            return;
+        }
         baseMapper.updateShowCart(liveId, showCart);
         clearLiveCache(liveId);
-        return R.ok();
+    }
+
+    private void publishShowCartWsMessage(Long liveId, Integer showCart) {
+        try {
+            JSONObject payload = new JSONObject();
+            payload.put("liveId", liveId);
+            payload.put("cmd", "showCart");
+            payload.put("status", showCart);
+            redisCache.publish(LiveKeysConstant.LIVE_WS_BROADCAST_CHANNEL, payload.toJSONString());
+        } catch (Exception e) {
+            log.warn("发布购物车显示状态 WebSocket 广播失败, liveId={}, showCart={}", liveId, showCart, e);
+        }
     }
 
     @Override

+ 2 - 2
fs-service/src/main/resources/application-config-druid-tyt.yml

@@ -37,8 +37,8 @@ wx:
       port: 6379
       timeout: 2000
     configs:
-      - appId: wx6ee517a8d8743f88  # 第一个公众号的appid
-        secret: 1fac75465a61f9259a0fe19795d9e80d # 公众号的appsecret
+      - appId: wx21ba7ccef1b72fe5  # 第一个公众号的appid
+        secret: a6b16c72b26081990b9f77905d7f4038 # 公众号的appsecret
         token: PPKOdAlCoMO # 接口配置里的Token值
         aesKey: Eswa6VjwtVMCcw03qZy6fWllgrv5aytIA1SZPEU0kU2 # 接口配置里的EncodingAESKey值
   open:

+ 1 - 1
fs-service/src/main/resources/application-druid-tyt.yml

@@ -228,7 +228,7 @@ rocketmq:
 openIM:
     secret: openIM123
     userID: imAdmin
-    url: https://web.tytim.ylrzfs.com/api
+    url: https://tytwebim.ylrzcloud.com/api
 #是否使用新im
 im:
     type: OPENIM

+ 1 - 4
fs-user-app/src/main/java/com/fs/app/controller/live/LiveController.java

@@ -98,9 +98,6 @@ public class LiveController extends AppBaseController {
     private ILiveCouponUserService liveCouponUserService;
     @Autowired
     private ILiveUserRedRecordService liveUserRedRecordService;
-    @Autowired
-    private ILiveConsoleOpLogService liveConsoleOpLogService;
-
 	/**
 	 * 查询未结束直播间(销售专用)
 	 */
@@ -420,7 +417,7 @@ public class LiveController extends AppBaseController {
 	public R myRewardRecords(@RequestParam Long liveId) {
 		try {
 			Long userId = Long.valueOf(getUserId());
-			return R.ok().put("data", liveConsoleOpLogService.getUserRewardRecords(liveId, userId));
+			return R.ok().put("data", liveFacadeService.getUserRewardRecords(liveId, userId));
 		} catch (Exception e) {
 			log.error("查询奖品留存记录失败, liveId={}, userId={}", liveId, getUserId(), e);
 			return R.error("查询奖品记录失败: " + e.getMessage());

+ 6 - 0
fs-user-app/src/main/java/com/fs/app/facade/LiveFacadeService.java

@@ -7,6 +7,7 @@ import com.fs.live.domain.LiveWatchUser;
 import com.fs.live.param.CouponPO;
 import com.fs.live.param.LotteryPO;
 import com.fs.live.param.RedPO;
+import com.fs.live.vo.LiveUserRewardRecordsVo;
 
 public interface LiveFacadeService {
     R liveList(PageRequest pageRequest);
@@ -23,4 +24,9 @@ public interface LiveFacadeService {
 
     R couponClaim(CouponPO coupon);
 
+    /**
+     * 查询用户在指定直播间的奖品留存记录(含开播/观看时长)
+     */
+    LiveUserRewardRecordsVo getUserRewardRecords(Long liveId, Long userId);
+
 }

+ 33 - 0
fs-user-app/src/main/java/com/fs/app/facade/impl/LiveFacadeServiceImpl.java

@@ -23,6 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -263,4 +265,35 @@ public class LiveFacadeServiceImpl extends BaseController implements LiveFacadeS
         liveConsoleOpLogService.bindOpLogUser(lottery.getOpLogId(), lottery.getLiveId(), lottery.getUserId());
         return R.ok("参与抽奖成功!请在直播间等待开奖");
     }
+
+    @Override
+    public LiveUserRewardRecordsVo getUserRewardRecords(Long liveId, Long userId) {
+        LiveUserRewardRecordsVo vo = new LiveUserRewardRecordsVo();
+        vo.setRecords(liveConsoleOpLogService.listUserOpLogRecords(liveId, userId));
+        vo.setLiveDuration(resolveLiveDuration(liveId));
+        vo.setWatchDuration(liveWatchUserService.getUserWatchDuration(liveId, userId));
+        return vo;
+    }
+
+    private Long resolveLiveDuration(Long liveId) {
+        if (liveId == null) {
+            return 0L;
+        }
+        Live live = liveService.selectLiveByLiveId(liveId);
+        if (live == null) {
+            return 0L;
+        }
+        if (live.getDuration() != null && live.getDuration() > 0) {
+            return live.getDuration();
+        }
+        if (live.getVideoDuration() != null && live.getVideoDuration() > 0) {
+            return live.getVideoDuration();
+        }
+        if (live.getStartTime() != null) {
+            LocalDateTime endTime = live.getFinishTime() != null ? live.getFinishTime() : LocalDateTime.now();
+            long seconds = ChronoUnit.SECONDS.between(live.getStartTime(), endTime);
+            return Math.max(seconds, 0L);
+        }
+        return 0L;
+    }
 }