瀏覽代碼

1、限制观看奖励发放

yys 1 周之前
父節點
當前提交
d6b85d348f

+ 151 - 96
fs-live-app/src/main/java/com/fs/live/task/Task.java

@@ -472,7 +472,7 @@ public class Task {
         }
     }
 
-    @Scheduled(cron = "0 0/1 * * * ?")
+    @Scheduled(cron = "0/30 * * * * ?")
     @DistributeLock(key = "autoUpdateWatchReward", scene = "task")
     @Transactional
     public void autoUpdateWatchReward() {
@@ -496,124 +496,59 @@ public class Task {
         Date now = new Date();
 
         for (Live openRewardLive : openRewardLives) {
-            String configJson = openRewardLive.getConfigJson();
+            Live live = liveService.selectLiveDbByLiveId(openRewardLive.getLiveId());
+            if (live == null || StringUtils.isEmpty(live.getConfigJson())) {
+                continue;
+            }
+            String configJson = live.getConfigJson();
             LiveWatchConfig config = JSON.parseObject(configJson, LiveWatchConfig.class);
             if (!config.getEnabled() || config.getParticipateCondition() == null || config.getAction() == null) {
-                log.info("{} autoUpdateWatchReward 配置未启用或缺失: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
+                log.info("{} autoUpdateWatchReward 配置未启用或缺失: liveId={}", LOG_PREFIX, live.getLiveId());
                 continue;
             }
             // 只处理 "达到指定观看时长" 的参与条件
             if (1 != config.getParticipateCondition()) {
                 log.info("{} autoUpdateWatchReward 参与条件非观看时长: liveId={}, condition={}",
-                        LOG_PREFIX, openRewardLive.getLiveId(), config.getParticipateCondition());
+                        LOG_PREFIX, live.getLiveId(), config.getParticipateCondition());
                 continue;
             }
-            if (openRewardLive.getStartTime() != null
-                    && openRewardLive.getStartTime().isAfter(LocalDateTime.now())) {
+            if (live.getStartTime() != null && live.getStartTime().isAfter(LocalDateTime.now())) {
                 log.info("{} autoUpdateWatchReward 直播未开始,跳过: liveId={}, startTime={}",
-                        LOG_PREFIX, openRewardLive.getLiveId(), openRewardLive.getStartTime());
+                        LOG_PREFIX, live.getLiveId(), live.getStartTime());
+                continue;
+            }
+            if (config.getWatchDuration() == null || config.getWatchDuration() <= 0) {
+                log.info("{} autoUpdateWatchReward 未配置观看时长: liveId={}", LOG_PREFIX, live.getLiveId());
                 continue;
             }
 
-            List<LiveWatchUser> liveWatchUsers = liveWatchUserService.checkOnlineNoRewardUser(openRewardLive.getLiveId(), now);
+            List<LiveWatchUser> liveWatchUsers = liveWatchUserService.checkOnlineNoRewardUser(live.getLiveId(), now);
             if (liveWatchUsers == null || liveWatchUsers.isEmpty()) {
-                log.info("{} autoUpdateWatchReward 无待发放用户: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
+                log.info("{} autoUpdateWatchReward 无待发放用户: liveId={}", LOG_PREFIX, live.getLiveId());
                 continue;
             }
-            long requiredWatchSeconds = config.getWatchDuration() * 60;
-            List<LiveWatchUser> onlineUser = liveWatchUsers.stream()
-                    .filter(user -> {
-                        if (user.getUserId() == null) {
-                            return false;
-                        }
-                        Long watchSeconds = liveWatchUserService.getUserWatchDuration(
-                                openRewardLive.getLiveId(), user.getUserId());
+            long requiredWatchSeconds = config.getWatchDuration() * 60L;
+            List<Long> userIds = liveWatchUsers.stream()
+                    .map(LiveWatchUser::getUserId)
+                    .filter(Objects::nonNull)
+                    .filter(userId -> {
+                        Long watchSeconds = liveWatchUserService.getUserLiveWatchDurationSeconds(live.getLiveId(), userId);
                         return watchSeconds != null && watchSeconds >= requiredWatchSeconds;
                     })
                     .collect(Collectors.toList());
-            if (onlineUser.isEmpty()) {
-                log.info("{} autoUpdateWatchReward 无达到观看时长用户: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
+            if (userIds.isEmpty()) {
+                log.info("{} autoUpdateWatchReward 无达到观看时长用户: liveId={}, required={}s",
+                        LOG_PREFIX, live.getLiveId(), requiredWatchSeconds);
                 continue;
             }
 
-            List<Long> userIds = onlineUser.stream().map(LiveWatchUser::getUserId).collect(Collectors.toList());
-            log.info("{} autoUpdateWatchReward 准备发放: liveId={}, action={}, 用户数={}",
-                    LOG_PREFIX, openRewardLive.getLiveId(), config.getAction(), userIds.size());
+            log.info("{} autoUpdateWatchReward 准备发放: liveId={}, action={}, 用户数={}, required={}s",
+                    LOG_PREFIX, live.getLiveId(), config.getAction(), userIds.size(), requiredWatchSeconds);
 
-            // 根据 action 类型处理不同的奖励
-            Long action = config.getAction();
-            if (action == null) {
-                continue;
-            }
-
-            switch (action.intValue()) {
-                case 2: // 积分红包
-                    saveUserRewardRecord(openRewardLive, userIds, BigDecimal.valueOf(config.getScoreAmount()), 2);
-                    LiveConsoleOpLog watchPointsOpLog = liveConsoleOpLogService.saveLog(
-                            openRewardLive.getLiveId(),
-                            LiveConsoleOpLog.OP_WATCH_REWARD_POINTS,
-                            LiveConsoleOpLog.HANDLE_AUTO,
-                            openRewardLive.getLiveId(),
-                            resolveWatchRewardPointsBizName(config, userIds.size())
-                    );
-                    int pointsSuccessCount = 0;
-                    for (Long userId : userIds) {
-                        if (grantWatchRewardIntegral(openRewardLive.getLiveId(), userId, config.getScoreAmount())) {
-                            liveConsoleOpLogService.bindOpLogUser(
-                                    watchPointsOpLog.getId(), openRewardLive.getLiveId(), userId);
-                            webSocketServer.sendIntegralMessage(
-                                    openRewardLive.getLiveId(), userId, config.getScoreAmount(), watchPointsOpLog);
-                            pointsSuccessCount++;
-                        }
-                    }
-                    if (pointsSuccessCount > 0) {
-                        rewardLiveCount++;
-                        rewardedUserCount += pointsSuccessCount;
-                        log.info("{} autoUpdateWatchReward 积分发放完成: liveId={}, 成功用户数={}/{}",
-                                LOG_PREFIX, openRewardLive.getLiveId(), pointsSuccessCount, userIds.size());
-                    } else {
-                        log.warn("{} autoUpdateWatchReward 积分发放全部失败: liveId={}, 用户数={}",
-                                LOG_PREFIX, openRewardLive.getLiveId(), userIds.size());
-                    }
-                    break;
-
-                case 3: // 优惠券
-                    // 获取配置的优惠券ID
-                    String actionCouponIdStr = config.getActionCouponId();
-                    if (StringUtils.isBlank(actionCouponIdStr)) {
-                        log.warn("直播间观看奖励配置为优惠券,但未配置优惠券ID,liveId={}", openRewardLive.getLiveId());
-                        continue;
-                    }
-                    Long actionCouponId = Long.parseLong(actionCouponIdStr);
-                    LiveCoupon watchRewardCoupon = liveCouponService.selectLiveCouponById(actionCouponId);
-                    List<LiveConsoleOpLogUser> couponRelations = bindCouponToUsers(openRewardLive, userIds, actionCouponId, false);
-                    if (!couponRelations.isEmpty()) {
-                        LiveConsoleOpLog watchCouponOpLog = liveConsoleOpLogService.saveLog(
-                                openRewardLive.getLiveId(),
-                                LiveConsoleOpLog.OP_WATCH_REWARD_COUPON,
-                                LiveConsoleOpLog.HANDLE_AUTO,
-                                actionCouponId,
-                                resolveWatchRewardCouponBizName(watchRewardCoupon, actionCouponId, couponRelations.size())
-                        );
-                        liveConsoleOpLogService.bindOpLogUsers(
-                                watchCouponOpLog.getId(), openRewardLive.getLiveId(), couponRelations);
-                        couponRelations.forEach(relation -> sendCouponRewardMessage(
-                                openRewardLive.getLiveId(), relation.getUserId(), watchRewardCoupon, watchCouponOpLog));
-                        rewardLiveCount++;
-                        rewardedUserCount += couponRelations.size();
-                        log.info("{} autoUpdateWatchReward 优惠券发放完成: liveId={}, 用户数={}",
-                                LOG_PREFIX, openRewardLive.getLiveId(), couponRelations.size());
-                    } else {
-                        log.warn("{} autoUpdateWatchReward 优惠券发放无成功用户: liveId={}, couponId={}",
-                                LOG_PREFIX, openRewardLive.getLiveId(), actionCouponId);
-                    }
-                    break;
-
-                case 1: // 现金红包 - 暂不处理(现有逻辑)
-                default:
-                    log.info("{} autoUpdateWatchReward 奖励类型暂不处理: action={}, liveId={}",
-                            LOG_PREFIX, action, openRewardLive.getLiveId());
-                    break;
+            int granted = grantWatchRewardToUsers(live, config, userIds);
+            if (granted > 0) {
+                rewardLiveCount++;
+                rewardedUserCount += granted;
             }
         }
         logTaskFinish("autoUpdateWatchReward",
@@ -623,6 +558,126 @@ public class Task {
         }
     }
 
+    /**
+     * 心跳触发的观看奖励发放(15 秒节流,达到配置时长后立即发放)
+     */
+    @Transactional
+    public void tryGrantWatchRewardOnHeartbeat(Long liveId, Long userId) {
+        if (liveId == null || userId == null) {
+            return;
+        }
+        String throttleKey = "live:watch:reward:heartbeat:" + liveId + ":" + userId;
+        if (!Boolean.TRUE.equals(redisCache.setIfAbsent(throttleKey, "1", 15, TimeUnit.SECONDS))) {
+            return;
+        }
+        try {
+            Live live = liveService.selectLiveDbByLiveId(liveId);
+            if (live == null || StringUtils.isEmpty(live.getConfigJson())) {
+                return;
+            }
+            LiveWatchConfig config = JSON.parseObject(live.getConfigJson(), LiveWatchConfig.class);
+            if (!config.getEnabled() || config.getParticipateCondition() == null || config.getParticipateCondition() != 1
+                    || config.getAction() == null || config.getWatchDuration() == null || config.getWatchDuration() <= 0) {
+                return;
+            }
+            if (live.getStartTime() != null && live.getStartTime().isAfter(LocalDateTime.now())) {
+                return;
+            }
+            long requiredWatchSeconds = config.getWatchDuration() * 60L;
+            Long watchSeconds = liveWatchUserService.getUserLiveWatchDurationSeconds(liveId, userId);
+            if (watchSeconds == null || watchSeconds < requiredWatchSeconds) {
+                return;
+            }
+            List<LiveWatchUser> pendingUsers = liveWatchUserService.checkOnlineNoRewardUser(liveId, new Date());
+            boolean eligible = pendingUsers != null && pendingUsers.stream()
+                    .anyMatch(u -> userId.equals(u.getUserId()));
+            if (!eligible) {
+                return;
+            }
+            grantWatchRewardToUsers(live, config, Collections.singletonList(userId));
+        } catch (Exception e) {
+            log.error("{} tryGrantWatchRewardOnHeartbeat 失败: liveId={}, userId={}",
+                    LOG_PREFIX, liveId, userId, e);
+        }
+    }
+
+    /**
+     * 向达标用户发放观看奖励(积分 / 优惠券)
+     *
+     * @return 成功发放的用户数
+     */
+    private int grantWatchRewardToUsers(Live live, LiveWatchConfig config, List<Long> userIds) {
+        if (live == null || config == null || userIds == null || userIds.isEmpty()) {
+            return 0;
+        }
+        Long action = config.getAction();
+        if (action == null) {
+            return 0;
+        }
+        Long liveId = live.getLiveId();
+        switch (action.intValue()) {
+            case 2:
+                saveUserRewardRecord(live, userIds, BigDecimal.valueOf(config.getScoreAmount()), 2);
+                LiveConsoleOpLog watchPointsOpLog = liveConsoleOpLogService.saveLog(
+                        liveId,
+                        LiveConsoleOpLog.OP_WATCH_REWARD_POINTS,
+                        LiveConsoleOpLog.HANDLE_AUTO,
+                        liveId,
+                        resolveWatchRewardPointsBizName(config, userIds.size())
+                );
+                int pointsSuccessCount = 0;
+                for (Long uid : userIds) {
+                    if (grantWatchRewardIntegral(liveId, uid, config.getScoreAmount())) {
+                        liveConsoleOpLogService.bindOpLogUser(watchPointsOpLog.getId(), liveId, uid);
+                        webSocketServer.sendIntegralMessage(liveId, uid, config.getScoreAmount(), watchPointsOpLog);
+                        pointsSuccessCount++;
+                    }
+                }
+                if (pointsSuccessCount > 0) {
+                    log.info("{} 观看奖励积分发放完成: liveId={}, 成功用户数={}/{}",
+                            LOG_PREFIX, liveId, pointsSuccessCount, userIds.size());
+                } else {
+                    log.warn("{} 观看奖励积分发放全部失败: liveId={}, 用户数={}",
+                            LOG_PREFIX, liveId, userIds.size());
+                }
+                return pointsSuccessCount;
+
+            case 3:
+                String actionCouponIdStr = config.getActionCouponId();
+                if (StringUtils.isBlank(actionCouponIdStr)) {
+                    log.warn("直播间观看奖励配置为优惠券,但未配置优惠券ID,liveId={}", liveId);
+                    return 0;
+                }
+                Long actionCouponId = Long.parseLong(actionCouponIdStr);
+                LiveCoupon watchRewardCoupon = liveCouponService.selectLiveCouponById(actionCouponId);
+                List<LiveConsoleOpLogUser> couponRelations = bindCouponToUsers(live, userIds, actionCouponId, false);
+                if (!couponRelations.isEmpty()) {
+                    LiveConsoleOpLog watchCouponOpLog = liveConsoleOpLogService.saveLog(
+                            liveId,
+                            LiveConsoleOpLog.OP_WATCH_REWARD_COUPON,
+                            LiveConsoleOpLog.HANDLE_AUTO,
+                            actionCouponId,
+                            resolveWatchRewardCouponBizName(watchRewardCoupon, actionCouponId, couponRelations.size())
+                    );
+                    liveConsoleOpLogService.bindOpLogUsers(watchCouponOpLog.getId(), liveId, couponRelations);
+                    couponRelations.forEach(relation -> sendCouponRewardMessage(
+                            liveId, relation.getUserId(), watchRewardCoupon, watchCouponOpLog));
+                    log.info("{} 观看奖励优惠券发放完成: liveId={}, 用户数={}",
+                            LOG_PREFIX, liveId, couponRelations.size());
+                    return couponRelations.size();
+                }
+                log.warn("{} 观看奖励优惠券发放无成功用户: liveId={}, couponId={}",
+                        LOG_PREFIX, liveId, actionCouponId);
+                return 0;
+
+            case 1:
+            default:
+                log.info("{} 观看奖励类型暂不处理: action={}, liveId={}",
+                        LOG_PREFIX, action, liveId);
+                return 0;
+        }
+    }
+
     /**
      * 将优惠券绑定到用户,并收集留存关联信息
      *

+ 2 - 0
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -25,6 +25,7 @@ import com.fs.common.utils.spring.SpringUtils;
 import com.fs.live.domain.*;
 import com.fs.live.service.*;
 import com.fs.live.task.LiveCompletionPointsTask;
+import com.fs.live.task.Task;
 import com.fs.live.vo.LiveConsoleOpLogVo;
 import com.fs.live.vo.LiveGoodsVo;
 import lombok.extern.slf4j.Slf4j;
@@ -460,6 +461,7 @@ public class WebSocketServer {
                     sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
                     if (userType == 0) {
                         checkCompletionRewardsOnHeartbeat(liveId, userId);
+                        SpringUtils.getBean(Task.class).tryGrantWatchRewardOnHeartbeat(liveId, userId);
                     }
                     break;
                 case "sendMsg":

+ 5 - 0
fs-service/src/main/java/com/fs/live/service/ILiveWatchUserService.java

@@ -163,6 +163,11 @@ public interface ILiveWatchUserService {
      */
     Long getUserWatchDuration(Long liveId, Long userId);
 
+    /**
+     * 获取用户当前直播场次观看时长(秒),仅统计当前 liveFlag/replayFlag 对应记录 + Redis 会话
+     */
+    Long getUserLiveWatchDurationSeconds(Long liveId, Long userId);
+
     /**
      * 批量更新直播间观看用户
      * @param liveWatchUsers 需要更新的观看用户列表

+ 22 - 1
fs-service/src/main/java/com/fs/live/service/impl/LiveWatchUserServiceImpl.java

@@ -1336,8 +1336,29 @@ public class LiveWatchUserServiceImpl implements ILiveWatchUserService {
         if (!isLiveWatchDurationCountable(live)) {
             return 0L;
         }
-        long liveStartMillis = resolveLiveStartMillis(live);
         long duration = safeLong(getTotalWatchDuration(liveId, userId));
+        return appendRedisWatchDuration(live, liveId, userId, duration);
+    }
+
+    @Override
+    public Long getUserLiveWatchDurationSeconds(Long liveId, Long userId) {
+        if (liveId == null || userId == null) {
+            return 0L;
+        }
+        Live live = liveMapper.selectLiveByLiveId(liveId);
+        if (!isLiveWatchDurationCountable(live)) {
+            return 0L;
+        }
+        Map<String, Integer> flagMap = getLiveFlagWithCache(liveId);
+        LiveWatchUser record = baseMapper.selectByUniqueIndex(
+                liveId, userId, flagMap.get("liveFlag"), flagMap.get("replayFlag"));
+        long baseDuration = safeOnlineSeconds(record);
+        return appendRedisWatchDuration(live, liveId, userId, baseDuration);
+    }
+
+    private long appendRedisWatchDuration(Live live, Long liveId, Long userId, long baseDuration) {
+        long liveStartMillis = resolveLiveStartMillis(live);
+        long duration = baseDuration;
         try {
             String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
             Long entryTime = redisCache.getCacheObject(entryTimeKey);