|
|
@@ -47,6 +47,17 @@ import static com.fs.live.websocket.service.WebSocketServer.USER_ENTRY_TIME_KEY;
|
|
|
public class Task {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(Task.class);
|
|
|
+ private static final String LOG_PREFIX = "[LiveScheduled]";
|
|
|
+
|
|
|
+
|
|
|
+ private void logTaskFinish(String taskName, String summary) {
|
|
|
+ log.info("{} {} 完成, {}", LOG_PREFIX, taskName, summary);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void logTaskError(String taskName, Exception e) {
|
|
|
+ log.error("{} {} 异常: {}", LOG_PREFIX, taskName, e.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
private final ILiveService liveService;
|
|
|
|
|
|
private final ILiveDataService liveDataService;
|
|
|
@@ -93,9 +104,12 @@ public class Task {
|
|
|
@Scheduled(cron = "0 0/1 * * * ?")
|
|
|
@DistributeLock(key = "updateLiveStatusByTime", scene = "task")
|
|
|
public void updateLiveStatusByTime() {
|
|
|
+ try {
|
|
|
List<Live> list = liveService.selectNoEndLiveList();
|
|
|
- if (list.isEmpty())
|
|
|
+ if (list.isEmpty()) {
|
|
|
+ logTaskFinish("updateLiveStatusByTime", "无未结束直播间,跳过");
|
|
|
return;
|
|
|
+ }
|
|
|
List<Long> liveIdLists = list.stream().map(Live::getLiveId).collect(Collectors.toList());
|
|
|
List<LiveAutoTask> liveAutoTasks = liveAutoTaskService.selectLiveAutoTaskByLiveIds(liveIdLists);
|
|
|
List<Live> liveList = new ArrayList<>();
|
|
|
@@ -185,14 +199,7 @@ public class Task {
|
|
|
// 将开启的直播间信息写入Redis缓存,用于打标签定时任务
|
|
|
try {
|
|
|
// 获取视频时长
|
|
|
- Long videoDuration = 0L;
|
|
|
- List<LiveVideo> videos = liveVideoService.listByLiveId(live.getLiveId(), 1);
|
|
|
- if (CollUtil.isNotEmpty(videos)) {
|
|
|
- videoDuration = videos.stream()
|
|
|
- .filter(v -> v.getDuration() != null)
|
|
|
- .mapToLong(LiveVideo::getDuration)
|
|
|
- .sum();
|
|
|
- }
|
|
|
+ Long videoDuration = sumLiveVideoDurationSeconds(live.getLiveId());
|
|
|
|
|
|
// 如果视频时长大于0,将直播间信息存入Redis
|
|
|
if (videoDuration > 0 && live.getStartTime() != null) {
|
|
|
@@ -245,11 +252,23 @@ public class Task {
|
|
|
liveService.asyncToCache();
|
|
|
}
|
|
|
|
|
|
+ logTaskFinish("updateLiveStatusByTime",
|
|
|
+ String.format("扫描=%d, 状态更新=%d, 开播=%d, 结束=%d, 加载自动任务缓存=%d",
|
|
|
+ list.size(), liveList.size(), startLiveList.size(), endLiveList.size(),
|
|
|
+ startLiveList.stream().mapToInt(live -> (int) liveAutoTasks.stream()
|
|
|
+ .filter(t -> t.getLiveId().equals(live.getLiveId())).count()).sum()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("updateLiveStatusByTime", e);
|
|
|
+ }
|
|
|
}
|
|
|
@Scheduled(cron = "0/1 * * * * ?")
|
|
|
@DistributeLock(key = "liveLotteryTask", scene = "task")
|
|
|
public void liveLotteryTask() {
|
|
|
- long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
+ long startMs = System.currentTimeMillis();
|
|
|
+ int lotteryProcessed = 0;
|
|
|
+ int redProcessed = 0;
|
|
|
+ try {
|
|
|
+ long currentTime = Instant.now().toEpochMilli();
|
|
|
String lotteryKey = "live:lottery_task:*";
|
|
|
Set<String> allLiveKeys = redisCache.redisTemplate.keys(lotteryKey);
|
|
|
if (allLiveKeys != null && !allLiveKeys.isEmpty()) {
|
|
|
@@ -258,7 +277,9 @@ public class Task {
|
|
|
if (range == null || range.isEmpty()) {
|
|
|
continue;
|
|
|
}
|
|
|
+ log.info("{} liveLotteryTask 处理抽奖: liveKey={}, 待执行数={}", LOG_PREFIX, liveKey, range.size());
|
|
|
processLotteryTask(range);
|
|
|
+ lotteryProcessed += range.size();
|
|
|
redisCache.redisTemplate.opsForZSet()
|
|
|
.removeRangeByScore(liveKey, 0, currentTime);
|
|
|
}
|
|
|
@@ -266,34 +287,34 @@ public class Task {
|
|
|
|
|
|
String redKey = "live:red_task:*";
|
|
|
allLiveKeys = redisCache.redisTemplate.keys(redKey);
|
|
|
- if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- for (String liveKey : allLiveKeys) {
|
|
|
- Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
- if (range == null || range.isEmpty()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- List<LiveConsoleOpLog> opLogs = updateRedStatus(range);
|
|
|
- redisCache.redisTemplate.opsForZSet()
|
|
|
- .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
- try {
|
|
|
- Long liveId = Long.parseLong(liveKey.substring("live:red_task:".length()));
|
|
|
- // 广播红包关闭消息
|
|
|
- SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(liveId);
|
|
|
- sendMsgVo.setCmd("red");
|
|
|
- sendMsgVo.setStatus(-1);
|
|
|
- liveService.asyncToCacheLiveConfig(liveId);
|
|
|
- if (!opLogs.isEmpty()) {
|
|
|
- WebSocketServer.attachOpLog(sendMsgVo, opLogs.get(opLogs.size() - 1));
|
|
|
+ if (allLiveKeys != null && !allLiveKeys.isEmpty()) {
|
|
|
+ for (String liveKey : allLiveKeys) {
|
|
|
+ Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ log.info("{} liveLotteryTask 处理红包: liveKey={}, 待执行数={}", LOG_PREFIX, liveKey, range.size());
|
|
|
+ updateRedStatus(range);
|
|
|
+ redProcessed += range.size();
|
|
|
+ redisCache.redisTemplate.opsForZSet()
|
|
|
+ .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ try {
|
|
|
+ Long liveId = Long.parseLong(liveKey.substring("live:red_task:".length()));
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setCmd("red");
|
|
|
+ sendMsgVo.setStatus(-1);
|
|
|
+ liveService.asyncToCacheLiveConfig(liveId);
|
|
|
+ webSocketServer.broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ log.info("{} liveLotteryTask 红包关闭广播完成: liveId={}, redIds={}", LOG_PREFIX, liveId, range);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{} liveLotteryTask 红包关闭广播失败: liveKey={}, error={}", LOG_PREFIX, liveKey, e.getMessage(), e);
|
|
|
}
|
|
|
- webSocketServer.broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("更新红包状态异常", e);
|
|
|
}
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("liveLotteryTask", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private List<LiveConsoleOpLog> updateRedStatus(Set<String> range) {
|
|
|
@@ -302,13 +323,21 @@ public class Task {
|
|
|
|
|
|
private void processLotteryTask(Set<String> range) {
|
|
|
List<LiveLotteryConfVo> liveLotteries = liveLotteryConfService.selectVoListByLotteryIds(range);
|
|
|
- if(liveLotteries.isEmpty()) return;
|
|
|
+ if (liveLotteries.isEmpty()) {
|
|
|
+ log.warn("{} processLotteryTask 未查到抽奖配置: lotteryIds={}", LOG_PREFIX, range);
|
|
|
+ return;
|
|
|
+ }
|
|
|
Date now = new Date();
|
|
|
for (LiveLotteryConfVo liveLottery : liveLotteries) {
|
|
|
+ log.info("{} processLotteryTask 开始开奖: lotteryId={}, liveId={}",
|
|
|
+ LOG_PREFIX, liveLottery.getLotteryId(), liveLottery.getLiveId());
|
|
|
// 查询抽奖数量
|
|
|
List<LiveLotteryProductListVo> products = liveLottery.getProducts();
|
|
|
Integer totalLots = products.stream().mapToInt(liveLotteryProductListVo -> Math.toIntExact(liveLotteryProductListVo.getTotalLots())).sum();
|
|
|
- if(totalLots <= 0) continue;
|
|
|
+ if (totalLots <= 0) {
|
|
|
+ log.warn("{} processLotteryTask 奖品数量为0,跳过: lotteryId={}", LOG_PREFIX, liveLottery.getLotteryId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
// 先将参与记录插入数据库
|
|
|
String hashKey = String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG_DRAW, liveLottery.getLiveId(), liveLottery.getLotteryId());
|
|
|
Map<Object, Object> hashEntries = redisCache.hashEntries(hashKey);
|
|
|
@@ -322,7 +351,11 @@ public class Task {
|
|
|
|
|
|
// 查询在线用户 并且参与了抽奖的用户
|
|
|
List<LiveWatchUser> liveWatchUsers = liveWatchUserService.selectLiveWatchAndRegisterUser(liveLottery.getLiveId(),liveLottery.getLotteryId());
|
|
|
- if(liveWatchUsers.isEmpty()) continue;
|
|
|
+ if (liveWatchUsers.isEmpty()) {
|
|
|
+ log.warn("{} processLotteryTask 无在线参与用户,跳过: lotteryId={}, liveId={}",
|
|
|
+ LOG_PREFIX, liveLottery.getLotteryId(), liveLottery.getLiveId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
LiveLotteryRegistration liveLotteryRegistration;
|
|
|
// 收集中奖信息
|
|
|
List<LotteryVo> lotteryVos = new ArrayList<>();
|
|
|
@@ -370,63 +403,67 @@ public class Task {
|
|
|
sendMsgVo.setLiveId(liveLottery.getLiveId());
|
|
|
sendMsgVo.setCmd("LotteryDetail");
|
|
|
sendMsgVo.setData(JSON.toJSONString(lotteryVos));
|
|
|
- WebSocketServer.attachOpLog(sendMsgVo, liveConsoleOpLogService.saveLotterySettleLog(
|
|
|
- liveLottery.getLiveId(),
|
|
|
- LiveConsoleOpLog.HANDLE_AUTO,
|
|
|
- liveLottery.getLotteryId(),
|
|
|
- resolveLotteryBizName(liveLottery.getLotteryId(), liveLottery.getDesc())
|
|
|
- ));
|
|
|
webSocketServer.broadcastMessage(liveLottery.getLiveId(), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
|
liveService.asyncToCacheLiveConfig(liveLottery.getLiveId());
|
|
|
// 删除缓存 同步抽奖记录
|
|
|
redisCache.deleteObject(hashKey);
|
|
|
+ log.info("{} processLotteryTask 开奖完成: lotteryId={}, liveId={}, 中奖人数={}",
|
|
|
+ LOG_PREFIX, liveLottery.getLotteryId(), liveLottery.getLiveId(), lotteryVos.size());
|
|
|
}
|
|
|
|
|
|
List<Long> collect = liveLotteries.stream().map(LiveLotteryConfVo::getLotteryId).collect(Collectors.toList());
|
|
|
liveLotteryConfService.finishStatusByLotteryIds(collect);
|
|
|
+ log.info("{} processLotteryTask 更新抽奖状态完成: lotteryIds={}", LOG_PREFIX, collect);
|
|
|
}
|
|
|
|
|
|
@Scheduled(cron = "0/1 * * * * ?")
|
|
|
@DistributeLock(key = "liveAutoTask", scene = "task")
|
|
|
public void liveAutoTask() {
|
|
|
- long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
- log.info("定时任务执行 - 当前时间戳: {}, 当前时间: {}", currentTime, new Date(currentTime));
|
|
|
+ long startMs = System.currentTimeMillis();
|
|
|
+ int totalProcessed = 0;
|
|
|
+ try {
|
|
|
+ long currentTime = Instant.now().toEpochMilli();
|
|
|
|
|
|
Set<String> allLiveKeys = redisCache.redisTemplate.keys("live:auto_task:*");
|
|
|
if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
- return; // 没有数据,直接返回
|
|
|
+ return;
|
|
|
}
|
|
|
- // 2. 遍历每个直播间的ZSet键
|
|
|
for (String liveKey : allLiveKeys) {
|
|
|
- // 3. 获取当前直播间ZSet中所有元素(按score排序)
|
|
|
- // range方法:0表示第一个元素,-1表示最后一个元素,即获取全部
|
|
|
Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
if (range == null || range.isEmpty()) {
|
|
|
- log.info("当前直播间没有数据,跳过处理");
|
|
|
- continue; // 没有数据,直接返回
|
|
|
+ continue;
|
|
|
}
|
|
|
+ log.info("{} liveAutoTask 处理直播间任务: liveKey={}, 待执行数={}, 截止时间戳={}",
|
|
|
+ LOG_PREFIX, liveKey, range.size(), currentTime);
|
|
|
redisCache.redisTemplate.opsForZSet()
|
|
|
.removeRangeByScore(liveKey, 0, currentTime);
|
|
|
processAutoTask(range);
|
|
|
+ totalProcessed += range.size();
|
|
|
+ }
|
|
|
+ if (totalProcessed > 0) {
|
|
|
+ logTaskFinish("liveAutoTask", "执行自动任务数=" + totalProcessed);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("liveAutoTask", e);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- public static void main(String[] args) {
|
|
|
- long currentTime = Instant.now().toEpochMilli();
|
|
|
- System.out.println(currentTime);
|
|
|
- long startTime = 1776219541000L;
|
|
|
- System.out.println(new Date(startTime));
|
|
|
- System.out.println(new Date(currentTime));
|
|
|
-
|
|
|
}
|
|
|
|
|
|
private void processAutoTask(Set<String> range) {
|
|
|
- for (String liveAutoTask : range) {
|
|
|
- LiveAutoTask task = JSON.parseObject(liveAutoTask, LiveAutoTask.class);
|
|
|
- webSocketServer.handleAutoTask(task);
|
|
|
- task.setFinishStatus(1L);
|
|
|
- liveAutoTaskService.finishLiveAutoTask(task);
|
|
|
+ for (String liveAutoTaskJson : range) {
|
|
|
+ try {
|
|
|
+ LiveAutoTask task = JSON.parseObject(liveAutoTaskJson, LiveAutoTask.class);
|
|
|
+ log.info("{} processAutoTask 开始执行: taskId={}, liveId={}, taskType={}, absValue={}",
|
|
|
+ LOG_PREFIX, task.getId(), task.getLiveId(), task.getTaskType(), task.getAbsValue());
|
|
|
+ webSocketServer.handleAutoTask(task);
|
|
|
+ task.setFinishStatus(1L);
|
|
|
+ liveAutoTaskService.finishLiveAutoTask(task);
|
|
|
+ log.info("{} processAutoTask 执行成功: taskId={}, liveId={}, taskType={}",
|
|
|
+ LOG_PREFIX, task.getId(), task.getLiveId(), task.getTaskType());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{} processAutoTask 执行失败: content={}, error={}",
|
|
|
+ LOG_PREFIX, liveAutoTaskJson, e.getMessage(), e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -434,37 +471,56 @@ public class Task {
|
|
|
@DistributeLock(key = "autoUpdateWatchReward", scene = "task")
|
|
|
@Transactional
|
|
|
public void autoUpdateWatchReward() {
|
|
|
+ int rewardLiveCount = 0;
|
|
|
+ int rewardedUserCount = 0;
|
|
|
+ try {
|
|
|
|
|
|
// 1.查询所有直播中的直播间
|
|
|
List<Live> lives = liveService.liveList();
|
|
|
-
|
|
|
+ if (lives == null || lives.isEmpty()) {
|
|
|
+ logTaskFinish("autoUpdateWatchReward", "无直播中直播间,跳过");
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
// 2.检查是否开启观看奖励
|
|
|
List<Live> openRewardLives = lives.stream().filter(live -> StringUtils.isNotEmpty(live.getConfigJson())).collect(Collectors.toList());
|
|
|
+ if (openRewardLives.isEmpty()) {
|
|
|
+ logTaskFinish("autoUpdateWatchReward", "无观看奖励配置,跳过");
|
|
|
+ return;
|
|
|
+ }
|
|
|
Date now = new Date();
|
|
|
|
|
|
for (Live openRewardLive : openRewardLives) {
|
|
|
String configJson = openRewardLive.getConfigJson();
|
|
|
LiveWatchConfig config = JSON.parseObject(configJson, LiveWatchConfig.class);
|
|
|
if (!config.getEnabled() || config.getParticipateCondition() == null || config.getAction() == null) {
|
|
|
+ log.info("{} autoUpdateWatchReward 配置未启用或缺失: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
|
|
|
continue;
|
|
|
}
|
|
|
// 只处理 "达到指定观看时长" 的参与条件
|
|
|
if (1 != config.getParticipateCondition()) {
|
|
|
+ log.info("{} autoUpdateWatchReward 参与条件非观看时长: liveId={}, condition={}",
|
|
|
+ LOG_PREFIX, openRewardLive.getLiveId(), config.getParticipateCondition());
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
List<LiveWatchUser> liveWatchUsers = liveWatchUserService.checkOnlineNoRewardUser(openRewardLive.getLiveId(), now);
|
|
|
if (liveWatchUsers == null || liveWatchUsers.isEmpty()) {
|
|
|
+ log.info("{} autoUpdateWatchReward 无待发放用户: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
|
|
|
continue;
|
|
|
}
|
|
|
// 3.检查当前直播间的在线用户(可以传入一个时间,然后查出来当天没领取奖励的用户)
|
|
|
List<LiveWatchUser> onlineUser = liveWatchUsers
|
|
|
.stream().filter(user -> (now.getTime() - user.getUpdateTime().getTime() + (user.getOnlineSeconds() == null ? 0L : user.getOnlineSeconds())) > config.getWatchDuration() * 60 * 1000)
|
|
|
.collect(Collectors.toList());
|
|
|
- if (onlineUser.isEmpty()) continue;
|
|
|
+ if (onlineUser.isEmpty()) {
|
|
|
+ log.info("{} autoUpdateWatchReward 无达到观看时长用户: liveId={}", LOG_PREFIX, openRewardLive.getLiveId());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
List<Long> userIds = onlineUser.stream().map(LiveWatchUser::getUserId).collect(Collectors.toList());
|
|
|
+ log.info("{} autoUpdateWatchReward 准备发放: liveId={}, action={}, 用户数={}",
|
|
|
+ LOG_PREFIX, openRewardLive.getLiveId(), config.getAction(), userIds.size());
|
|
|
|
|
|
// 根据 action 类型处理不同的奖励
|
|
|
Long action = config.getAction();
|
|
|
@@ -488,6 +544,10 @@ public class Task {
|
|
|
);
|
|
|
userIds.forEach(userId -> webSocketServer.sendIntegralMessage(
|
|
|
openRewardLive.getLiveId(), userId, config.getScoreAmount(), watchPointsOpLog));
|
|
|
+ rewardLiveCount++;
|
|
|
+ rewardedUserCount += userIds.size();
|
|
|
+ log.info("{} autoUpdateWatchReward 积分发放完成: liveId={}, 用户数={}",
|
|
|
+ LOG_PREFIX, openRewardLive.getLiveId(), userIds.size());
|
|
|
break;
|
|
|
|
|
|
case 3: // 优惠券
|
|
|
@@ -512,15 +572,28 @@ public class Task {
|
|
|
watchCouponOpLog.getId(), openRewardLive.getLiveId(), couponRelations);
|
|
|
couponRelations.forEach(relation -> sendCouponRewardMessage(
|
|
|
openRewardLive.getLiveId(), relation.getUserId(), watchRewardCoupon, watchCouponOpLog));
|
|
|
+ rewardLiveCount++;
|
|
|
+ rewardedUserCount += couponRelations.size();
|
|
|
+ log.info("{} autoUpdateWatchReward 优惠券发放完成: liveId={}, 用户数={}",
|
|
|
+ LOG_PREFIX, openRewardLive.getLiveId(), couponRelations.size());
|
|
|
+ } else {
|
|
|
+ log.warn("{} autoUpdateWatchReward 优惠券发放无成功用户: liveId={}, couponId={}",
|
|
|
+ LOG_PREFIX, openRewardLive.getLiveId(), actionCouponId);
|
|
|
}
|
|
|
break;
|
|
|
|
|
|
case 1: // 现金红包 - 暂不处理(现有逻辑)
|
|
|
default:
|
|
|
- log.info("观看奖励类型 {} 暂不处理,liveId={}", action, openRewardLive.getLiveId());
|
|
|
+ log.info("{} autoUpdateWatchReward 奖励类型暂不处理: action={}, liveId={}",
|
|
|
+ LOG_PREFIX, action, openRewardLive.getLiveId());
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+ logTaskFinish("autoUpdateWatchReward",
|
|
|
+ String.format("发放直播间数=%d, 发放用户数=%d", rewardLiveCount, rewardedUserCount));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("autoUpdateWatchReward", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -764,9 +837,14 @@ public class Task {
|
|
|
*/
|
|
|
@Scheduled(cron = "0 0/1 * * * ?")// 每分钟执行一次
|
|
|
public void syncLiveDataToDB() {
|
|
|
+ int syncCount = 0;
|
|
|
+ int couponSyncCount = 0;
|
|
|
+ try {
|
|
|
List<LiveData> liveDatas = liveDataService.getAllLiveDatas(); // 获取所有正在直播的直播间数据
|
|
|
- if(liveDatas == null)
|
|
|
+ if (liveDatas == null || liveDatas.isEmpty()) {
|
|
|
+ logTaskFinish("syncLiveDataToDB", "无直播数据,跳过");
|
|
|
return;
|
|
|
+ }
|
|
|
liveDatas.forEach(liveData ->{
|
|
|
|
|
|
Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveData.getLiveId());
|
|
|
@@ -862,9 +940,8 @@ public class Task {
|
|
|
if(!liveDatas.isEmpty())
|
|
|
for (LiveData liveData : liveDatas) {
|
|
|
liveDataService.updateLiveData(liveData);
|
|
|
+ syncCount++;
|
|
|
}
|
|
|
- /*// 更新数据库
|
|
|
- liveDataService.updateLiveData(liveData);*/
|
|
|
Set<String> keys = redisCache.redisTemplate.keys(String.format(LIVE_COUPON_NUM, "*"));
|
|
|
if (keys != null && !keys.isEmpty()) {
|
|
|
for (String key : keys) {
|
|
|
@@ -874,9 +951,15 @@ public class Task {
|
|
|
updateEntity.setId(Long.valueOf(key));
|
|
|
updateEntity.setRemainCount(Long.parseLong(o.toString()));
|
|
|
liveCouponIssueService.updateLiveCouponIssue(updateEntity);
|
|
|
+ couponSyncCount++;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ logTaskFinish("syncLiveDataToDB",
|
|
|
+ String.format("同步直播数据=%d, 同步优惠券余量=%d", syncCount, couponSyncCount));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("syncLiveDataToDB", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -885,7 +968,11 @@ public class Task {
|
|
|
@Scheduled(cron = "0/5 * * * * ?")
|
|
|
@DistributeLock(key = "updateRedQuantityNum", scene = "task")
|
|
|
public void updateRedQuantityNum() {
|
|
|
- liveRedConfService.updateRedQuantityNum();
|
|
|
+ try {
|
|
|
+ liveRedConfService.updateRedQuantityNum();
|
|
|
+ } catch (Exception e) {
|
|
|
+ logTaskError("updateRedQuantityNum", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -895,6 +982,7 @@ public class Task {
|
|
|
@Scheduled(cron = "0/10 * * * * ?")
|
|
|
@DistributeLock(key = "scanLiveTagMark", scene = "task")
|
|
|
public void scanLiveTagMark() {
|
|
|
+ int processedCount = 0;
|
|
|
try {
|
|
|
|
|
|
// 获取所有打标签缓存的key
|
|
|
@@ -1040,9 +1128,11 @@ public class Task {
|
|
|
processedLiveIds.add(liveId);
|
|
|
// 调用打标签方法
|
|
|
liveWatchUserService.qwTagMarkByLiveWatchLog(liveId);
|
|
|
+ processedCount++;
|
|
|
+ log.info("{} scanLiveTagMark 打标签完成: liveId={}", LOG_PREFIX, liveId);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理直播间打标签缓存异常: key={}, error={}", key, e.getMessage(), e);
|
|
|
+ log.error("{} scanLiveTagMark 处理缓存异常: key={}, error={}", LOG_PREFIX, key, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1052,11 +1142,13 @@ public class Task {
|
|
|
String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, liveId);
|
|
|
redisCache.deleteObject(tagMarkKey);
|
|
|
} catch (Exception e) {
|
|
|
- log.error("删除直播间打标签缓存失败: liveId={}, error={}", liveId, e.getMessage(), e);
|
|
|
+ log.error("{} scanLiveTagMark 删除缓存失败: liveId={}, error={}", LOG_PREFIX, liveId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ logTaskFinish("scanLiveTagMark",
|
|
|
+ String.format("扫描缓存=%d, 打标签完成=%d", keys.size(), processedCount));
|
|
|
} catch (Exception e) {
|
|
|
- log.error("扫描直播间打标签任务异常: error={}", e.getMessage(), e);
|
|
|
+ logTaskError("scanLiveTagMark", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1067,11 +1159,14 @@ public class Task {
|
|
|
@Scheduled(cron = "0/30 * * * * ?")
|
|
|
@DistributeLock(key = "scanLiveWatchUserStatus", scene = "task")
|
|
|
public void scanLiveWatchUserStatus() {
|
|
|
+ int processedLiveCount = 0;
|
|
|
+ int updatedLogCount = 0;
|
|
|
try {
|
|
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
// 查询所有正在直播的直播间
|
|
|
List<Live> activeLives = liveService.selectNoEndLiveList();
|
|
|
if (activeLives == null || activeLives.isEmpty()) {
|
|
|
+ logTaskFinish("scanLiveWatchUserStatus", "无活跃直播间,跳过");
|
|
|
return;
|
|
|
}
|
|
|
for (Live live : activeLives) {
|
|
|
@@ -1096,15 +1191,7 @@ public class Task {
|
|
|
if (onlineUsers == null || onlineUsers.isEmpty()) {
|
|
|
continue;
|
|
|
}
|
|
|
- // 获取直播视频总时长
|
|
|
- List<LiveVideo> videos = liveVideoService.listByLiveIdWithCache(liveId, 1);
|
|
|
- long totalVideoDuration = 0L;
|
|
|
- if (videos != null && !videos.isEmpty()) {
|
|
|
- totalVideoDuration = videos.stream()
|
|
|
- .filter(v -> v.getDuration() != null)
|
|
|
- .mapToLong(LiveVideo::getDuration)
|
|
|
- .sum();
|
|
|
- }
|
|
|
+ long totalVideoDuration = sumLiveVideoDurationSeconds(liveId);
|
|
|
|
|
|
// 处理每个在线用户
|
|
|
List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
@@ -1163,15 +1250,22 @@ public class Task {
|
|
|
for (LiveWatchLog liveWatchLog : updateLog) {
|
|
|
redisCache.setCacheObject("live:watch:log:cache:" + liveWatchLog.getLogId(), liveWatchLog, 1, TimeUnit.HOURS);
|
|
|
}
|
|
|
+ updatedLogCount += updateLog.size();
|
|
|
+ processedLiveCount++;
|
|
|
+ log.info("{} scanLiveWatchUserStatus 更新观看记录: liveId={}, 记录数={}",
|
|
|
+ LOG_PREFIX, liveId, updateLog.size());
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
- log.error("处理直播间观看记录状态异常: liveId={}, error={}",
|
|
|
- live.getLiveId(), e.getMessage(), e);
|
|
|
+ log.error("{} scanLiveWatchUserStatus 处理直播间异常: liveId={}, error={}",
|
|
|
+ LOG_PREFIX, live.getLiveId(), e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
+ logTaskFinish("scanLiveWatchUserStatus",
|
|
|
+ String.format("扫描直播间=%d, 更新记录直播间=%d, 更新记录数=%d",
|
|
|
+ activeLives.size(), processedLiveCount, updatedLogCount));
|
|
|
} catch (Exception e) {
|
|
|
- log.error("实时扫描用户直播数据任务异常: error={}", e.getMessage(), e);
|
|
|
+ logTaskError("scanLiveWatchUserStatus", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1231,8 +1325,8 @@ public class Task {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}",
|
|
|
- liveId, userId, e.getMessage(), e);
|
|
|
+ log.error("{} updateLiveWatchLogTypeByDuration 异常: liveId={}, userId={}, error={}",
|
|
|
+ LOG_PREFIX, liveId, userId, e.getMessage(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1242,12 +1336,16 @@ public class Task {
|
|
|
@Scheduled(cron = "0 0/1 * * * ?")
|
|
|
@DistributeLock(key = "updateLiveWatchUserStatus", scene = "task")
|
|
|
public void updateLiveWatchUserStatus() {
|
|
|
+ int updatedLogCount = 0;
|
|
|
try {
|
|
|
Set<String> keys = redisCache.redisTemplate.keys("live:user:watch:log:*");
|
|
|
LocalDateTime now = LocalDateTime.now();
|
|
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
- if (keys != null && !keys.isEmpty()) {
|
|
|
+ if (keys == null || keys.isEmpty()) {
|
|
|
+ logTaskFinish("updateLiveWatchUserStatus", "无用户活跃缓存,跳过");
|
|
|
+ return;
|
|
|
+ }
|
|
|
for (String key : keys) {
|
|
|
String[] split = key.split(":");
|
|
|
String cacheTime = redisCache.getCacheObject(key);
|
|
|
@@ -1277,7 +1375,8 @@ public class Task {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("解析缓存时间失败: cacheTime={}, error={}", cacheTime, e.getMessage());
|
|
|
+ log.error("{} updateLiveWatchUserStatus 解析缓存时间失败: cacheTime={}, error={}",
|
|
|
+ LOG_PREFIX, cacheTime, e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -1292,10 +1391,12 @@ public class Task {
|
|
|
for (LiveWatchLog liveWatchLog : updateLog) {
|
|
|
redisCache.setCacheObject("live:watch:log:cache:" + liveWatchLog.getLogId(), liveWatchLog, 1, TimeUnit.HOURS);
|
|
|
}
|
|
|
+ updatedLogCount = updateLog.size();
|
|
|
}
|
|
|
- }
|
|
|
+ logTaskFinish("updateLiveWatchUserStatus",
|
|
|
+ String.format("扫描缓存=%d, 更新看课中断记录=%d", keys.size(), updatedLogCount));
|
|
|
} catch (Exception ex) {
|
|
|
- log.error("每分钟扫描一次用户在线状态用于更新用户观看记录值: error={}", ex.getMessage(), ex);
|
|
|
+ logTaskError("updateLiveWatchUserStatus", ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1313,7 +1414,7 @@ public class Task {
|
|
|
// List<Live> activeLives = liveService.selectNoEndLiveList();
|
|
|
//
|
|
|
// if (activeLives == null || activeLives.isEmpty()) {
|
|
|
-// log.debug("当前没有活跃的直播间");
|
|
|
+// log.info("当前没有活跃的直播间");
|
|
|
// return;
|
|
|
// }
|
|
|
//
|
|
|
@@ -1405,4 +1506,18 @@ public class Task {
|
|
|
}
|
|
|
return title + ",发放" + userCount + "人";
|
|
|
}
|
|
|
+
|
|
|
+ private long sumLiveVideoDurationSeconds(Long liveId) {
|
|
|
+ List<LiveVideo> videos = liveVideoService.listByLiveId(liveId, 1);
|
|
|
+ if (videos == null || videos.isEmpty()) {
|
|
|
+ return 0L;
|
|
|
+ }
|
|
|
+ long total = 0L;
|
|
|
+ for (LiveVideo video : videos) {
|
|
|
+ if (video != null && video.getDuration() != null) {
|
|
|
+ total += video.getDuration();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return total;
|
|
|
+ }
|
|
|
}
|