|
@@ -0,0 +1,1123 @@
|
|
|
|
|
+package com.fs.live.task;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.fs.common.constant.LiveKeysConstant;
|
|
|
|
|
+import com.fs.common.core.domain.R;
|
|
|
|
|
+import com.fs.common.core.redis.RedisCache;
|
|
|
|
|
+import com.fs.common.utils.StringUtils;
|
|
|
|
|
+import com.fs.framework.aspectj.lock.DistributeLock;
|
|
|
|
|
+import com.fs.erp.service.FsJstAftersalePushService;
|
|
|
|
|
+import com.fs.his.service.IFsUserService;
|
|
|
|
|
+import com.fs.live.domain.*;
|
|
|
|
|
+import com.fs.live.mapper.LiveLotteryRegistrationMapper;
|
|
|
|
|
+import com.fs.live.param.LiveReplayParam;
|
|
|
|
|
+import com.fs.live.service.*;
|
|
|
|
|
+import com.fs.live.vo.LiveLotteryConfVo;
|
|
|
|
|
+import com.fs.live.vo.LiveLotteryProductListVo;
|
|
|
|
|
+import com.fs.live.vo.LotteryVo;
|
|
|
|
|
+import com.fs.live.ws.bean.WsSendMsgVo;
|
|
|
|
|
+import com.fs.live.ws.service.LiveWsRoomBroadcastFacade;
|
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
|
|
+import java.math.BigDecimal;
|
|
|
|
|
+import java.time.Instant;
|
|
|
|
|
+import java.time.LocalDateTime;
|
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
+
|
|
|
|
|
+import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
|
|
+import static com.fs.common.constant.LiveKeysConstant.LIVE_COUPON_NUM;
|
|
|
|
|
+import static com.fs.live.ws.constant.WsRedisKeys.USER_ENTRY_TIME_KEY;
|
|
|
|
|
+
|
|
|
|
|
+@Component
|
|
|
|
|
+@AllArgsConstructor
|
|
|
|
|
+public class Task {
|
|
|
|
|
+
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(Task.class);
|
|
|
|
|
+ private final ILiveService liveService;
|
|
|
|
|
+
|
|
|
|
|
+ private final ILiveDataService liveDataService;
|
|
|
|
|
+
|
|
|
|
|
+ private final RedisCache redisCache;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveWatchUserService liveWatchUserService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private IFsUserService fsUserService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveRewardRecordService liveRewardRecordService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private LiveWsRoomBroadcastFacade liveWsRoomBroadcastFacade;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveAutoTaskService liveAutoTaskService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveLotteryConfService liveLotteryConfService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveUserLotteryRecordService liveUserLotteryRecordService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private LiveLotteryRegistrationMapper liveLotteryRegistrationMapper;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveRedConfService liveRedConfService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveCouponIssueService liveCouponIssueService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveVideoService liveVideoService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveWatchLogService liveWatchLogService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveUserFirstEntryService liveUserFirstEntryService;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ public FsJstAftersalePushService fsJstAftersalePushService;
|
|
|
|
|
+
|
|
|
|
|
+ @Scheduled(cron = "0 0/1 * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "updateLiveStatusByTime", scene = "task")
|
|
|
|
|
+ public void updateLiveStatusByTime() {
|
|
|
|
|
+ List<Live> list = liveService.selectNoEndLiveList();
|
|
|
|
|
+ if (list.isEmpty())
|
|
|
|
|
+ return;
|
|
|
|
|
+ List<Long> liveIdLists = list.stream().map(Live::getLiveId).collect(Collectors.toList());
|
|
|
|
|
+ List<LiveAutoTask> liveAutoTasks = liveAutoTaskService.selectLiveAutoTaskByLiveIds(liveIdLists);
|
|
|
|
|
+ List<Live> liveList = new ArrayList<>();
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now().plusSeconds(2L);
|
|
|
|
|
+ List<Live> startLiveList = new ArrayList<>();
|
|
|
|
|
+ List<Live> endLiveList = new ArrayList<>();
|
|
|
|
|
+ list.forEach(live -> {
|
|
|
|
|
+ if (live.getLiveType() != 3) {
|
|
|
|
|
+ if (live.getFinishTime() == null) {
|
|
|
|
|
+ if (now.isAfter(live.getStartTime().minusSeconds(2L))){
|
|
|
|
|
+ if(live.getStatus() != 2){
|
|
|
|
|
+ redisCache.zSetRemove(LiveKeysConstant.LIVE_HOME_PAGE_LIST, JSON.toJSONString(live));
|
|
|
|
|
+ live.setStatus(2);
|
|
|
|
|
+ startLiveList.add(live);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else if (now.isBefore(live.getStartTime())) {
|
|
|
|
|
+ if (live.getStatus() != 1) {
|
|
|
|
|
+ live.setStatus(1);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ if (now.isAfter(live.getStartTime().minusSeconds(2L)) && now.isBefore(live.getFinishTime())) {
|
|
|
|
|
+ if(live.getStatus() != 2){
|
|
|
|
|
+ redisCache.zSetRemove(LiveKeysConstant.LIVE_HOME_PAGE_LIST, JSON.toJSONString(live));
|
|
|
|
|
+ startLiveList.add(live);
|
|
|
|
|
+ live.setStatus(2);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ live.setStatus(2);
|
|
|
|
|
+ } else if (now.isBefore(live.getStartTime().minusSeconds(2L))) {
|
|
|
|
|
+ if (live.getStatus() != 1) {
|
|
|
|
|
+ live.setStatus(1);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else if (now.isAfter(live.getFinishTime().minusSeconds(2L))) {
|
|
|
|
|
+ if(live.getStatus() != 3){
|
|
|
|
|
+ endLiveList.add(live);
|
|
|
|
|
+ live.setStatus(3);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 直播回放只需要检测结束时间就好了
|
|
|
|
|
+ LiveReplayParam liveReplayParam = JSON.parseObject(live.getLiveConfig(), LiveReplayParam.class);
|
|
|
|
|
+ if (liveReplayParam.getIsPlaybackOpen()) {
|
|
|
|
|
+ if (liveReplayParam.getFinishTime() != null) {
|
|
|
|
|
+ if (now.isAfter(liveReplayParam.getFinishTime().minusSeconds(2L))) {
|
|
|
|
|
+ if(live.getStatus() != 3){
|
|
|
|
|
+ endLiveList.add(live);
|
|
|
|
|
+ live.setStatus(3);
|
|
|
|
|
+ liveList.add(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ if(!liveList.isEmpty()){
|
|
|
|
|
+ for (Live live : liveList) {
|
|
|
|
|
+ liveService.updateLiveEntity(live);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ String key = "live:auto_task:";
|
|
|
|
|
+ if (!startLiveList.isEmpty()) {
|
|
|
|
|
+ for (Live live : startLiveList) {
|
|
|
|
|
+ WsSendMsgVo sendMsgVo = new WsSendMsgVo();
|
|
|
|
|
+ sendMsgVo.setMsg("开始直播");
|
|
|
|
|
+ sendMsgVo.setCmd("live_start");
|
|
|
|
|
+ liveWsRoomBroadcastFacade.broadcastMessage(live.getLiveId(), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
|
|
|
|
|
+ List<LiveAutoTask> collect = liveAutoTasks.stream().filter(liveAutoTask -> liveAutoTask.getLiveId().equals(live.getLiveId())).collect(Collectors.toList());
|
|
|
|
|
+ if (!collect.isEmpty()) {
|
|
|
|
|
+ collect.forEach(liveAutoTask -> {
|
|
|
|
|
+ liveAutoTask.setCreateTime(null);
|
|
|
|
|
+ liveAutoTask.setUpdateTime(null);
|
|
|
|
|
+ redisCache.zSetAdd(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
|
|
|
|
|
+ redisCache.expire(key+live.getLiveId(), 1, TimeUnit.DAYS);
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ // 清理小程序缓存 和 直播标签缓存
|
|
|
|
|
+ String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
|
|
|
|
|
+ redisCache.deleteObject(cacheKey);
|
|
|
|
|
+ liveWatchUserService.clearLiveFlagCache(live.getLiveId());
|
|
|
|
|
+ // 将开启的直播间信息写入Redis缓存,用于打标签定时任务
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 获取视频时长
|
|
|
|
|
+ Long videoDuration = 0L;
|
|
|
|
|
+ List<LiveVideo> videos = liveVideoService.listByLiveId(live.getLiveId(), 1);
|
|
|
|
|
+ if (CollUtil.isNotEmpty(videos)) {
|
|
|
|
|
+ videoDuration = videos.stream()
|
|
|
|
|
+ .filter(v -> v.getDuration() != null)
|
|
|
|
|
+ .mapToLong(LiveVideo::getDuration)
|
|
|
|
|
+ .sum();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果视频时长大于0,将直播间信息存入Redis
|
|
|
|
|
+ if (videoDuration > 0 && live.getStartTime() != null) {
|
|
|
|
|
+ Map<String, Object> tagMarkInfo = new HashMap<>();
|
|
|
|
|
+ tagMarkInfo.put("liveId", live.getLiveId());
|
|
|
|
|
+ tagMarkInfo.put("startTime", live.getStartTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
|
|
|
|
|
+ tagMarkInfo.put("videoDuration", videoDuration);
|
|
|
|
|
+
|
|
|
|
|
+ String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
|
|
|
|
|
+ redisCache.setCacheObject(tagMarkKey, JSON.toJSONString(tagMarkInfo), 24, TimeUnit.HOURS);
|
|
|
|
|
+ log.info("直播间开启,已加入打标签缓存: liveId={}, startTime={}, videoDuration={}",
|
|
|
|
|
+ live.getLiveId(), live.getStartTime(), videoDuration);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("写入直播间打标签缓存失败: liveId={}, error={}", live.getLiveId(), e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 重新更新所有在直播的缓存
|
|
|
|
|
+ liveService.asyncToCache();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (!endLiveList.isEmpty()) {
|
|
|
|
|
+ for (Live live : endLiveList) {
|
|
|
|
|
+ WsSendMsgVo sendMsgVo = new WsSendMsgVo();
|
|
|
|
|
+ sendMsgVo.setMsg("结束直播");
|
|
|
|
|
+ sendMsgVo.setCmd("live_end");
|
|
|
|
|
+ liveWsRoomBroadcastFacade.broadcastMessage(live.getLiveId(), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
|
|
|
|
|
+ List<LiveAutoTask> collect = liveAutoTasks.stream().filter(liveAutoTask -> liveAutoTask.getLiveId().equals(live.getLiveId())).collect(Collectors.toList());
|
|
|
|
|
+ if (!collect.isEmpty()) {
|
|
|
|
|
+ redisCache.deleteObject(key + live.getLiveId());
|
|
|
|
|
+ collect.forEach(liveAutoTask -> {
|
|
|
|
|
+ liveAutoTask.setCreateTime(null);
|
|
|
|
|
+ liveAutoTask.setUpdateTime(null);
|
|
|
|
|
+ redisCache.redisTemplate.opsForZSet().remove(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
|
|
|
|
|
+ redisCache.deleteObject(cacheKey);
|
|
|
|
|
+ liveWsRoomBroadcastFacade.removeLikeCountCache(live.getLiveId());
|
|
|
|
|
+
|
|
|
|
|
+ // 删除打标签缓存
|
|
|
|
|
+ try {
|
|
|
|
|
+ String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
|
|
|
|
|
+ redisCache.deleteObject(tagMarkKey);
|
|
|
|
|
+ log.info("直播间结束,已删除打标签缓存: liveId={}", live.getLiveId());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("删除直播间打标签缓存失败: liveId={}, error={}", live.getLiveId(), e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 重新更新所有在直播的缓存
|
|
|
|
|
+ liveService.asyncToCache();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ @Scheduled(cron = "0/1 * * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "liveLotteryTask", scene = "task")
|
|
|
|
|
+ public void liveLotteryTask() {
|
|
|
|
|
+ long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
|
|
+ String lotteryKey = "live:lottery_task:*";
|
|
|
|
|
+ Set<String> allLiveKeys = redisCache.redisTemplate.keys(lotteryKey);
|
|
|
|
|
+ if (allLiveKeys != null && !allLiveKeys.isEmpty()) {
|
|
|
|
|
+ for (String liveKey : allLiveKeys) {
|
|
|
|
|
+ Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ processLotteryTask(range);
|
|
|
|
|
+ redisCache.redisTemplate.opsForZSet()
|
|
|
|
|
+ .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String redKey = "live:red_task:*";
|
|
|
|
|
+ allLiveKeys = redisCache.redisTemplate.keys(redKey);
|
|
|
|
|
+ if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ for (String liveKey : allLiveKeys) {
|
|
|
|
|
+ Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ updateRedStatus(range);
|
|
|
|
|
+ redisCache.redisTemplate.opsForZSet()
|
|
|
|
|
+ .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 广播红包关闭消息
|
|
|
|
|
+ WsSendMsgVo sendMsgVo = new WsSendMsgVo();
|
|
|
|
|
+ sendMsgVo.setLiveId(Long.valueOf(liveKey));
|
|
|
|
|
+ sendMsgVo.setCmd("red");
|
|
|
|
|
+ sendMsgVo.setStatus(-1);
|
|
|
|
|
+ liveService.asyncToCacheLiveConfig(Long.parseLong(liveKey));
|
|
|
|
|
+ liveWsRoomBroadcastFacade.broadcastMessage(Long.valueOf(liveKey), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("更新红包状态异常", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void updateRedStatus(Set<String> range) {
|
|
|
|
|
+
|
|
|
|
|
+ liveRedConfService.finishRedStatusBySetIds(range);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void processLotteryTask(Set<String> range) {
|
|
|
|
|
+ List<LiveLotteryConfVo> liveLotteries = liveLotteryConfService.selectVoListByLotteryIds(range);
|
|
|
|
|
+ if(liveLotteries.isEmpty()) return;
|
|
|
|
|
+ Date now = new Date();
|
|
|
|
|
+ for (LiveLotteryConfVo liveLottery : liveLotteries) {
|
|
|
|
|
+ // 查询抽奖数量
|
|
|
|
|
+ List<LiveLotteryProductListVo> products = liveLottery.getProducts();
|
|
|
|
|
+ Integer totalLots = products.stream().mapToInt(liveLotteryProductListVo -> Math.toIntExact(liveLotteryProductListVo.getTotalLots())).sum();
|
|
|
|
|
+ if(totalLots <= 0) continue;
|
|
|
|
|
+ // 先将参与记录插入数据库
|
|
|
|
|
+ String hashKey = String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG_DRAW, liveLottery.getLiveId(), liveLottery.getLotteryId());
|
|
|
|
|
+ Map<Object, Object> hashEntries = redisCache.hashEntries(hashKey);
|
|
|
|
|
+ List<LiveLotteryRegistration> registrationList = new ArrayList<>();
|
|
|
|
|
+ if (CollUtil.isNotEmpty(hashEntries)) {
|
|
|
|
|
+ registrationList = hashEntries.values().stream()
|
|
|
|
|
+ .map(value -> JSONUtil.toBean(JSONUtil.parseObj(value), LiveLotteryRegistration.class))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ liveLotteryRegistrationMapper.insertLiveLotteryRegistrationBatch(registrationList);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 查询在线用户 并且参与了抽奖的用户
|
|
|
|
|
+ List<LiveWatchUser> liveWatchUsers = liveWatchUserService.selectLiveWatchAndRegisterUser(liveLottery.getLiveId(),liveLottery.getLotteryId());
|
|
|
|
|
+ if(liveWatchUsers.isEmpty()) continue;
|
|
|
|
|
+ LiveLotteryRegistration liveLotteryRegistration;
|
|
|
|
|
+ // 收集中奖信息
|
|
|
|
|
+ List<LotteryVo> lotteryVos = new ArrayList<>();
|
|
|
|
|
+ for (LiveLotteryProductListVo liveLotteryProductListVo : products) {
|
|
|
|
|
+ // 随机抽奖一个用户获取奖品
|
|
|
|
|
+ Long totalLotsPerProduct = liveLotteryProductListVo.getTotalLots();
|
|
|
|
|
+ for (int i = 0; i < totalLotsPerProduct && !liveWatchUsers.isEmpty(); i++) {
|
|
|
|
|
+ // 随机选择一个用户
|
|
|
|
|
+ int randomIndex = new Random().nextInt(liveWatchUsers.size());
|
|
|
|
|
+ LiveWatchUser winningUser = liveWatchUsers.get(randomIndex);
|
|
|
|
|
+
|
|
|
|
|
+ // 创建中奖记录
|
|
|
|
|
+ LiveUserLotteryRecord record = new LiveUserLotteryRecord();
|
|
|
|
|
+ record.setLotteryId(liveLottery.getLotteryId());
|
|
|
|
|
+ record.setLiveId(liveLottery.getLiveId());
|
|
|
|
|
+ record.setUserId(winningUser.getUserId());
|
|
|
|
|
+ record.setProductId(liveLotteryProductListVo.getProductId());
|
|
|
|
|
+ record.setCreateTime(new Date());
|
|
|
|
|
+ record.setOrderStatus(-9);
|
|
|
|
|
+
|
|
|
|
|
+ // 保存中奖记录
|
|
|
|
|
+ liveUserLotteryRecordService.insertLiveUserLotteryRecord(record);
|
|
|
|
|
+ liveLotteryRegistration = new LiveLotteryRegistration();
|
|
|
|
|
+ liveLotteryRegistration.setLotteryId(liveLottery.getLotteryId());
|
|
|
|
|
+ liveLotteryRegistration.setLiveId(liveLottery.getLotteryId());
|
|
|
|
|
+ liveLotteryRegistration.setUserId(winningUser.getUserId());
|
|
|
|
|
+ liveLotteryRegistration.setIsWin(1L);
|
|
|
|
|
+ liveLotteryRegistration.setUpdateTime(now);
|
|
|
|
|
+ liveLotteryRegistration.setRizeLevel(liveLotteryProductListVo.getPrizeLevel());
|
|
|
|
|
+ liveLotteryRegistrationMapper.updateLiveLotteryRegistrationNoId(liveLotteryRegistration);
|
|
|
|
|
+ // 从候选列表中移除该用户,确保每人只能中奖一次
|
|
|
|
|
+ liveWatchUsers.remove(randomIndex);
|
|
|
|
|
+ LotteryVo lotteryVo = new LotteryVo();
|
|
|
|
|
+ lotteryVo.setUserId(winningUser.getUserId());
|
|
|
|
|
+ lotteryVo.setUserName(winningUser.getNickName());
|
|
|
|
|
+ lotteryVo.setPrizeLevel(liveLotteryProductListVo.getPrizeLevel());
|
|
|
|
|
+ lotteryVo.setProductName(liveLotteryProductListVo.getProductName());
|
|
|
|
|
+ lotteryVo.setProductId(liveLotteryProductListVo.getProductId());
|
|
|
|
|
+ //设置中奖记录id
|
|
|
|
|
+ lotteryVo.setRecordId(record.getId());
|
|
|
|
|
+ lotteryVos.add(lotteryVo);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ WsSendMsgVo sendMsgVo = new WsSendMsgVo();
|
|
|
|
|
+ sendMsgVo.setLiveId(liveLottery.getLiveId());
|
|
|
|
|
+ sendMsgVo.setCmd("LotteryDetail");
|
|
|
|
|
+ sendMsgVo.setData(JSON.toJSONString(lotteryVos));
|
|
|
|
|
+ liveWsRoomBroadcastFacade.broadcastMessage(liveLottery.getLiveId(), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
|
|
+
|
|
|
|
|
+ liveService.asyncToCacheLiveConfig(liveLottery.getLiveId());
|
|
|
|
|
+ // 删除缓存 同步抽奖记录
|
|
|
|
|
+ redisCache.deleteObject(hashKey);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> collect = liveLotteries.stream().map(LiveLotteryConfVo::getLotteryId).collect(Collectors.toList());
|
|
|
|
|
+ liveLotteryConfService.finishStatusByLotteryIds(collect);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Scheduled(cron = "0/1 * * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "liveAutoTask", scene = "task")
|
|
|
|
|
+ public void liveAutoTask() {
|
|
|
|
|
+ long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
|
|
+
|
|
|
|
|
+ Set<String> allLiveKeys = redisCache.redisTemplate.keys("live:auto_task:*");
|
|
|
|
|
+ if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
|
|
+ return; // 没有数据,直接返回
|
|
|
|
|
+ }
|
|
|
|
|
+ // 2. 遍历每个直播间的ZSet键
|
|
|
|
|
+ for (String liveKey : allLiveKeys) {
|
|
|
|
|
+ // 3. 获取当前直播间ZSet中所有元素(按score排序)
|
|
|
|
|
+ // range方法:0表示第一个元素,-1表示最后一个元素,即获取全部
|
|
|
|
|
+ Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
|
|
+ continue; // 没有数据,直接返回
|
|
|
|
|
+ }
|
|
|
|
|
+ redisCache.redisTemplate.opsForZSet()
|
|
|
|
|
+ .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
|
|
+ processAutoTask(range);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void processAutoTask(Set<String> range) {
|
|
|
|
|
+ for (String liveAutoTask : range) {
|
|
|
|
|
+ LiveAutoTask task = JSON.parseObject(liveAutoTask, LiveAutoTask.class);
|
|
|
|
|
+ liveWsRoomBroadcastFacade.handleAutoTask(task);
|
|
|
|
|
+ task.setFinishStatus(1L);
|
|
|
|
|
+ liveAutoTaskService.finishLiveAutoTask(task);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Scheduled(cron = "0 0/1 * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "autoUpdateWatchReward", scene = "task")
|
|
|
|
|
+ @Transactional
|
|
|
|
|
+ public void autoUpdateWatchReward() {
|
|
|
|
|
+
|
|
|
|
|
+ // 1.查询所有直播中的直播间
|
|
|
|
|
+ List<Live> lives = liveService.liveList();
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ // 2.检查是否开启观看奖励
|
|
|
|
|
+ List<Live> openRewardLives = lives.stream().filter(live -> StringUtils.isNotEmpty(live.getConfigJson())).collect(Collectors.toList());
|
|
|
|
|
+ Date now = new Date();
|
|
|
|
|
+
|
|
|
|
|
+ for (Live openRewardLive : openRewardLives) {
|
|
|
|
|
+ String configJson = openRewardLive.getConfigJson();
|
|
|
|
|
+ LiveWatchConfig config = JSON.parseObject(configJson, LiveWatchConfig.class);
|
|
|
|
|
+ if (config.getEnabled() && 1 == config.getParticipateCondition()) {
|
|
|
|
|
+ List<LiveWatchUser> liveWatchUsers = liveWatchUserService.checkOnlineNoRewardUser(openRewardLive.getLiveId(), now);
|
|
|
|
|
+ if (liveWatchUsers == null || liveWatchUsers.isEmpty()) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 3.检查当前直播间的在线用户(可以传入一个时间,然后查出来当天没领取奖励的用户)
|
|
|
|
|
+ List<LiveWatchUser> onlineUser = liveWatchUsers
|
|
|
|
|
+ .stream().filter(user -> (now.getTime() - user.getUpdateTime().getTime() + ( user.getOnlineSeconds() == null ? 0L : user.getOnlineSeconds())) > config.getWatchDuration() * 60 * 1000)
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ if(onlineUser.isEmpty()) continue;
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> userIds = onlineUser.stream().map(LiveWatchUser::getUserId).collect(Collectors.toList());
|
|
|
|
|
+ // 4.保存用户领取记录
|
|
|
|
|
+ saveUserRewardRecord(openRewardLive, userIds,config.getScoreAmount());
|
|
|
|
|
+ // 5.更新用户积分(积分
|
|
|
|
|
+ fsUserService.increaseIntegral(userIds,config.getScoreAmount());
|
|
|
|
|
+ // 6.发送websocket事件消息 通知用户自动领取成功
|
|
|
|
|
+ userIds.forEach(userId -> liveWsRoomBroadcastFacade.sendIntegralMessage(openRewardLive.getLiveId(),userId,config.getScoreAmount()));
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ private void saveUserRewardRecord(Live live, List<Long> userIds,Long scoreAmount) {
|
|
|
|
|
+ for (Long userId : userIds) {
|
|
|
|
|
+ LiveRewardRecord record = new LiveRewardRecord();
|
|
|
|
|
+ record.setLiveId(live.getLiveId());
|
|
|
|
|
+ record.setUserId(userId);
|
|
|
|
|
+ record.setIncomeType(1L);
|
|
|
|
|
+ record.setSourceType(3L);
|
|
|
|
|
+ record.setSourceId(live.getCompanyId() == null ? 0L : live.getCompanyId());
|
|
|
|
|
+ record.setRewardType(2L);
|
|
|
|
|
+ record.setNum(BigDecimal.valueOf(scoreAmount));
|
|
|
|
|
+ record.setRewardType(2L);
|
|
|
|
|
+ record.setCreateTime(new Date());
|
|
|
|
|
+ record.setCreateBy(String.valueOf(userId));
|
|
|
|
|
+ liveRewardRecordService.insertLiveRewardRecord(record);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从Redis获取对象并转换为Long类型
|
|
|
|
|
+ * @param redisCache Redis缓存操作对象
|
|
|
|
|
+ * @param key 缓存键
|
|
|
|
|
+ * @return 转换后的Long值(若为null或转换失败则返回0L)
|
|
|
|
|
+ */
|
|
|
|
|
+ public static Long getAsLong(RedisCache redisCache, String key) {
|
|
|
|
|
+ // 从Redis获取原始对象
|
|
|
|
|
+ Object value = redisCache.getCacheObject(key);
|
|
|
|
|
+ if (value == null) {
|
|
|
|
|
+ return 0L; // 若缓存不存在,返回默认值0
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 根据实际类型转换为Long
|
|
|
|
|
+ if (value instanceof Long) {
|
|
|
|
|
+ return (Long) value;
|
|
|
|
|
+ } else if (value instanceof Integer) {
|
|
|
|
|
+ return ((Integer) value).longValue();
|
|
|
|
|
+ } else if (value instanceof String) {
|
|
|
|
|
+ // 处理字符串类型(可能是数字字符串,如"123")
|
|
|
|
|
+ try {
|
|
|
|
|
+ return Long.parseLong((String) value);
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ // 若字符串无法转为数字,返回0(或根据业务抛异常)
|
|
|
|
|
+ return 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 其他类型(如Double等),根据业务需求处理,这里默认返回0
|
|
|
|
|
+ return 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @PostConstruct
|
|
|
|
|
+ public void initLiveDatasOnStartup() {
|
|
|
|
|
+ log.info("项目启动,开始初始化直播点赞数据...");
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<LiveData> liveDatas = liveDataService.getAllLiveDatas();
|
|
|
|
|
+ if (CollectionUtils.isNotEmpty(liveDatas)) {
|
|
|
|
|
+ for (LiveData liveData : liveDatas) {
|
|
|
|
|
+ redisCache.deleteObject("live:like:" + liveData.getLiveId());
|
|
|
|
|
+ redisCache.setCacheObject("live:like:" + liveData.getLiveId(), liveData.getLikes().intValue());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("项目启动时加载直播数据失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ *定期将缓存的数据写入数据库
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(cron = "0 0/1 * * * ?")// 每分钟执行一次
|
|
|
|
|
+ public void syncLiveDataToDB() {
|
|
|
|
|
+ List<LiveData> liveDatas = liveDataService.getAllLiveDatas(); // 获取所有正在直播的直播间数据
|
|
|
|
|
+ if(liveDatas == null)
|
|
|
|
|
+ return;
|
|
|
|
|
+ liveDatas.forEach(liveData ->{
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveData.getLiveId());
|
|
|
|
|
+ Integer liveFlag = flagMap.get("liveFlag");
|
|
|
|
|
+
|
|
|
|
|
+ // 判断是直播还是回放
|
|
|
|
|
+ if (liveFlag != null && liveFlag == 1) {
|
|
|
|
|
+ // 直播:更新 likes 和 totalViews
|
|
|
|
|
+ Long resultLikeCount = getAsLong(redisCache, "live:like:" + liveData.getLiveId());
|
|
|
|
|
+ resultLikeCount = resultLikeCount > 0L ? resultLikeCount : liveData.getLikes();
|
|
|
|
|
+ redisCache.setCacheObject("live:like:" + liveData.getLiveId(), resultLikeCount.intValue());
|
|
|
|
|
+ liveData.setLikes(resultLikeCount);
|
|
|
|
|
+
|
|
|
|
|
+ // 从 redis 获取数据,并提供默认值,避免 NPE
|
|
|
|
|
+ liveData.setPageViews(
|
|
|
|
|
+ Math.max( liveData.getPageViews(), Optional.ofNullable(redisCache.incr(PAGE_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L))
|
|
|
|
|
+ );
|
|
|
|
|
+ liveData.setTotalViews(
|
|
|
|
|
+ Math.max( liveData.getTotalViews(), Optional.ofNullable(redisCache.incr(TOTAL_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L))
|
|
|
|
|
+ );
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 回放:使用 Redis 中的数据减去直播的数据,得到回放的数据
|
|
|
|
|
+ String likeKey = "live:like:" + liveData.getLiveId();
|
|
|
|
|
+ String totalViewsKey = TOTAL_VIEWS_KEY + liveData.getLiveId();
|
|
|
|
|
+
|
|
|
|
|
+ // 从 Redis 获取总数据(直播+回放)
|
|
|
|
|
+ Long totalLikeCount = getAsLong(redisCache, likeKey);
|
|
|
|
|
+ Long totalViewCount = getAsLong(redisCache, totalViewsKey);
|
|
|
|
|
+
|
|
|
|
|
+ // 获取数据库中直播的数据
|
|
|
|
|
+ Long liveLikeCount = liveData.getLikes() != null ? liveData.getLikes() : 0L;
|
|
|
|
|
+ Long liveViewCount = liveData.getTotalViews() != null ? liveData.getTotalViews() : 0L;
|
|
|
|
|
+
|
|
|
|
|
+ // 回放数据 = Redis总数据 - 直播数据
|
|
|
|
|
+ Long replayLikeNum = totalLikeCount - liveLikeCount;
|
|
|
|
|
+ Long replayViewNum = totalViewCount - liveViewCount;
|
|
|
|
|
+
|
|
|
|
|
+ // 确保回放数据不为负数
|
|
|
|
|
+ if (replayLikeNum < 0L) {
|
|
|
|
|
+ replayLikeNum = 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (replayViewNum < 0L) {
|
|
|
|
|
+ replayViewNum = 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 更新回放数据
|
|
|
|
|
+ liveData.setReplayLikeNum(replayLikeNum);
|
|
|
|
|
+ liveData.setReplayViewNum(replayViewNum);
|
|
|
|
|
+
|
|
|
|
|
+ // 从 redis 获取数据,并提供默认值,避免 NPE
|
|
|
|
|
+ liveData.setPageViews(
|
|
|
|
|
+ Math.max( liveData.getPageViews(), Optional.ofNullable(redisCache.incr(PAGE_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L))
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ liveData.setUniqueVisitors(
|
|
|
|
|
+ /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VISITORS_KEY + liveId))
|
|
|
|
|
+ .map(Set::size) // 获取集合大小
|
|
|
|
|
+ .map(Long::valueOf) // 转换为 Long 类型
|
|
|
|
|
+ .orElse(0L)*/
|
|
|
|
|
+ Math.max( liveData.getUniqueVisitors(), Optional.ofNullable(redisCache.incr(UNIQUE_VISITORS_KEY + liveData.getLiveId(),0)).orElse(0L))
|
|
|
|
|
+ );
|
|
|
|
|
+ liveData.setUniqueViewers(
|
|
|
|
|
+ /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VIEWERS_KEY + liveId))
|
|
|
|
|
+ .map(Set::size) // 获取集合大小
|
|
|
|
|
+ .map(Long::valueOf) // 转换为 Long 类型
|
|
|
|
|
+ .orElse(0L)*/
|
|
|
|
|
+ Math.max( liveData.getUniqueViewers(), Optional.ofNullable(redisCache.incr(UNIQUE_VIEWERS_KEY + liveData.getLiveId(),0)).orElse(0L))
|
|
|
|
|
+ );
|
|
|
|
|
+ // 使用Set大小来获取最大同时在线人数
|
|
|
|
|
+ String onlineUsersSetKey = ONLINE_USERS_SET_KEY + liveData.getLiveId();
|
|
|
|
|
+ Long currentSetSize = redisCache.redisTemplate.opsForSet().size(onlineUsersSetKey);
|
|
|
|
|
+ Long maxOnlineFromRedis = Optional.ofNullable(redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveData.getLiveId()))
|
|
|
|
|
+ .map(obj -> {
|
|
|
|
|
+ if (obj instanceof Number) {
|
|
|
|
|
+ return ((Number) obj).longValue();
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ return Long.parseLong(obj.toString());
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ return 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ .orElse(0L);
|
|
|
|
|
+ // 取Set大小和Redis中记录的最大在线人数的较大值
|
|
|
|
|
+ Long maxOnlineCount = Math.max(
|
|
|
|
|
+ currentSetSize != null ? currentSetSize : 0L,
|
|
|
|
|
+ maxOnlineFromRedis
|
|
|
|
|
+ );
|
|
|
|
|
+ liveData.setPeakConcurrentViewers(
|
|
|
|
|
+ Math.max(liveData.getPeakConcurrentViewers(), maxOnlineCount)
|
|
|
|
|
+ );
|
|
|
|
|
+ });
|
|
|
|
|
+ if(!liveDatas.isEmpty())
|
|
|
|
|
+ for (LiveData liveData : liveDatas) {
|
|
|
|
|
+ liveDataService.updateLiveData(liveData);
|
|
|
|
|
+ }
|
|
|
|
|
+ /*// 更新数据库
|
|
|
|
|
+ liveDataService.updateLiveData(liveData);*/
|
|
|
|
|
+ Set<String> keys = redisCache.redisTemplate.keys(String.format(LIVE_COUPON_NUM, "*"));
|
|
|
|
|
+ if (keys != null && !keys.isEmpty()) {
|
|
|
|
|
+ for (String key : keys) {
|
|
|
|
|
+ Object o = redisCache.redisTemplate.opsForValue().get(String.format(LIVE_COUPON_NUM, key));
|
|
|
|
|
+ if (o != null) {
|
|
|
|
|
+ LiveCouponIssue updateEntity = new LiveCouponIssue();
|
|
|
|
|
+ updateEntity.setId(Long.valueOf(key));
|
|
|
|
|
+ updateEntity.setRemainCount(Long.parseLong(o.toString()));
|
|
|
|
|
+ liveCouponIssueService.updateLiveCouponIssue(updateEntity);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 更新红包领取数量
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(cron = "0/30 * * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "updateRedQuantityNum", scene = "task")
|
|
|
|
|
+ public void updateRedQuantityNum() {
|
|
|
|
|
+ liveRedConfService.updateRedQuantityNum();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 定时扫描开启的直播间,检查是否到了打标签的时间,然后把正在看直播的用户拆分为 直播用户和回放用户
|
|
|
|
|
+ * 每10秒执行一次
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(cron = "0/10 * * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "scanLiveTagMark", scene = "task")
|
|
|
|
|
+ public void scanLiveTagMark() {
|
|
|
|
|
+ try {
|
|
|
|
|
+
|
|
|
|
|
+ // 获取所有打标签缓存的key
|
|
|
|
|
+ String pattern = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, "*");
|
|
|
|
|
+ Set<String> keys = redisCache.redisTemplate.keys(pattern);
|
|
|
|
|
+
|
|
|
|
|
+ if (keys == null || keys.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ long currentTimeMillis = System.currentTimeMillis();
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> processedLiveIds = new ArrayList<>();
|
|
|
|
|
+ Date nowDate = new Date();
|
|
|
|
|
+ for (String key : keys) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 从Redis获取直播间信息
|
|
|
|
|
+ Object cacheValue = redisCache.getCacheObject(key);
|
|
|
|
|
+ if (cacheValue == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ String jsonStr = cacheValue.toString();
|
|
|
|
|
+ JSONObject tagMarkInfo = JSON.parseObject(jsonStr);
|
|
|
|
|
+ Long liveId = tagMarkInfo.getLong("liveId");
|
|
|
|
|
+ Long startTimeMillis = tagMarkInfo.getLong("startTime");
|
|
|
|
|
+ Long videoDuration = tagMarkInfo.getLong("videoDuration");
|
|
|
|
|
+
|
|
|
|
|
+ if (liveId == null || startTimeMillis == null || videoDuration == null || videoDuration <= 0) {
|
|
|
|
|
+ log.info("直播间打标签缓存信息不完整: key={}, liveId={}, startTime={}, videoDuration={}",
|
|
|
|
|
+ key, liveId, startTimeMillis, videoDuration);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 查询直播间信息
|
|
|
|
|
+ Live live = liveService.selectLiveDbByLiveId(liveId);
|
|
|
|
|
+ if (live == null || live.getStartTime() == null) {
|
|
|
|
|
+ log.info("没查到直播间-{}",liveId);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 计算结束时间:开始时间 + 视频时长(秒转毫秒)
|
|
|
|
|
+ long endTimeMillis = startTimeMillis + (videoDuration * 1000);
|
|
|
|
|
+
|
|
|
|
|
+ // 如果当前时间已经超过了结束时间,执行打标签操作(约等于直播完成)
|
|
|
|
|
+ if (currentTimeMillis >= endTimeMillis) {
|
|
|
|
|
+ // 查询当前直播间的在线用户(liveFlag = 1, replayFlag = 0)
|
|
|
|
|
+ LiveWatchUser queryUser = new LiveWatchUser();
|
|
|
|
|
+ queryUser.setLiveId(liveId);
|
|
|
|
|
+ queryUser.setLiveFlag(1);
|
|
|
|
|
+ queryUser.setReplayFlag(0);
|
|
|
|
|
+ queryUser.setOnline(0);
|
|
|
|
|
+ List<LiveWatchUser> liveUsers = liveWatchUserService.selectAllWatchUser(queryUser);
|
|
|
|
|
+
|
|
|
|
|
+ if (liveUsers != null && !liveUsers.isEmpty()) {
|
|
|
|
|
+
|
|
|
|
|
+ List<LiveWatchUser> updateLiveUsers = new ArrayList<>(); // 需要更新的直播用户
|
|
|
|
|
+ List<LiveWatchUser> replayUsers = new ArrayList<>(); // 回放用户数据
|
|
|
|
|
+
|
|
|
|
|
+ for (LiveWatchUser liveUser : liveUsers) {
|
|
|
|
|
+ Long userId = liveUser.getUserId();
|
|
|
|
|
+ if (userId == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 1. 计算并更新直播用户的在线时长
|
|
|
|
|
+ // 优先从 Redis 获取进入时间
|
|
|
|
|
+ String entryTimeKey = String.format("live:user:entry:time:%s:%s", liveId, userId);
|
|
|
|
|
+ Long entryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
|
|
+
|
|
|
|
|
+ // 如果没有 Redis 记录,使用数据库中的 updateTime
|
|
|
|
|
+ if (entryTime == null) {
|
|
|
|
|
+ if (liveUser.getUpdateTime() != null) {
|
|
|
|
|
+ entryTime = liveUser.getUpdateTime().getTime();
|
|
|
|
|
+ } else if (liveUser.getCreateTime() != null) {
|
|
|
|
|
+ entryTime = liveUser.getCreateTime().getTime();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 计算当前观看时长(秒)
|
|
|
|
|
+ long currentWatchDuration = 0L;
|
|
|
|
|
+ if (entryTime != null) {
|
|
|
|
|
+ currentWatchDuration = (currentTimeMillis - entryTime) / 1000;
|
|
|
|
|
+ if (currentWatchDuration < 0) {
|
|
|
|
|
+ currentWatchDuration = 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 加上历史在线时长
|
|
|
|
|
+ Long historyOnlineSeconds = liveUser.getOnlineSeconds();
|
|
|
|
|
+ if (historyOnlineSeconds == null) {
|
|
|
|
|
+ historyOnlineSeconds = 0L;
|
|
|
|
|
+ }
|
|
|
|
|
+ long totalOnlineSeconds = historyOnlineSeconds + currentWatchDuration;
|
|
|
|
|
+
|
|
|
|
|
+ log.info("更新直播用户的在线时长 用户直播离线 录播在线");
|
|
|
|
|
+ // 更新直播用户的在线时长 用户直播离线 录播在线
|
|
|
|
|
+ liveUser.setOnlineSeconds(totalOnlineSeconds);
|
|
|
|
|
+ liveUser.setUpdateTime(nowDate);
|
|
|
|
|
+ liveUser.setOnline(1);
|
|
|
|
|
+ updateLiveUsers.add(liveUser);
|
|
|
|
|
+
|
|
|
|
|
+ log.info(" 生成回放用户数据(liveFlag = 0, replayFlag = 1),在线时长从0开始");
|
|
|
|
|
+ // 2. 生成回放用户数据(liveFlag = 0, replayFlag = 1),在线时长从0开始
|
|
|
|
|
+ LiveWatchUser replayUser = new LiveWatchUser();
|
|
|
|
|
+ replayUser.setLiveId(liveUser.getLiveId());
|
|
|
|
|
+ replayUser.setUserId(liveUser.getUserId());
|
|
|
|
|
+ replayUser.setMsgStatus(liveUser.getMsgStatus());
|
|
|
|
|
+ replayUser.setOnline(0);
|
|
|
|
|
+ replayUser.setOnlineSeconds(0L); // 回放观看时长从0开始,重新计时
|
|
|
|
|
+ replayUser.setGlobalVisible(liveUser.getGlobalVisible());
|
|
|
|
|
+ replayUser.setSingleVisible(liveUser.getSingleVisible());
|
|
|
|
|
+ replayUser.setLiveFlag(0); // 回放标记
|
|
|
|
|
+ replayUser.setReplayFlag(1); // 回放标记
|
|
|
|
|
+ replayUser.setLocation(liveUser.getLocation());
|
|
|
|
|
+ replayUser.setCreateTime(nowDate);
|
|
|
|
|
+ replayUser.setUpdateTime(nowDate);
|
|
|
|
|
+ replayUsers.add(replayUser);
|
|
|
|
|
+ redisCache.setCacheObject(entryTimeKey,currentTimeMillis);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 批量更新直播用户的在线时长
|
|
|
|
|
+ if (!updateLiveUsers.isEmpty()) {
|
|
|
|
|
+ int batchSize = 500;
|
|
|
|
|
+ for (int i = 0; i < updateLiveUsers.size(); i += batchSize) {
|
|
|
|
|
+ int end = Math.min(i + batchSize, updateLiveUsers.size());
|
|
|
|
|
+ List<LiveWatchUser> batch = updateLiveUsers.subList(i, end);
|
|
|
|
|
+ liveWatchUserService.batchUpdateLiveWatchUser(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 批量插入回放用户数据
|
|
|
|
|
+ if (!replayUsers.isEmpty()) {
|
|
|
|
|
+ int batchSize = 500;
|
|
|
|
|
+ for (int i = 0; i < replayUsers.size(); i += batchSize) {
|
|
|
|
|
+ int end = Math.min(i + batchSize, replayUsers.size());
|
|
|
|
|
+ List<LiveWatchUser> batch = replayUsers.subList(i, end);
|
|
|
|
|
+ liveWatchUserService.batchInsertLiveWatchUser(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 清理直播间状态缓存
|
|
|
|
|
+ liveWatchUserService.clearLiveFlagCache(liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 标记为已处理,稍后删除缓存
|
|
|
|
|
+ processedLiveIds.add(liveId);
|
|
|
|
|
+ // 调用打标签方法
|
|
|
|
|
+ liveWatchUserService.qwTagMarkByLiveWatchLog(liveId);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理直播间打标签缓存异常: key={}, error={}", key, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 删除已处理的直播间缓存
|
|
|
|
|
+ for (Long liveId : processedLiveIds) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, liveId);
|
|
|
|
|
+ redisCache.deleteObject(tagMarkKey);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("删除直播间打标签缓存失败: liveId={}, error={}", liveId, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("扫描直播间打标签任务异常: error={}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 实时扫描用户直播数据,根据用户的直播在线时长更新观看记录状态
|
|
|
|
|
+ * 每30秒执行一次
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(cron = "0/30 * * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "scanLiveWatchUserStatus", scene = "task")
|
|
|
|
|
+ public void scanLiveWatchUserStatus() {
|
|
|
|
|
+ try {
|
|
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
+ // 查询所有正在直播的直播间
|
|
|
|
|
+ List<Live> activeLives = liveService.selectNoEndLiveList();
|
|
|
|
|
+ if (activeLives == null || activeLives.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ for (Live live : activeLives) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long liveId = live.getLiveId();
|
|
|
|
|
+ if (liveId == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 获取直播间的直播/回放状态
|
|
|
|
|
+ Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
|
|
+ Integer liveFlag = flagMap.get("liveFlag");
|
|
|
|
|
+ // 只处理直播状态的用户(liveFlag = 1)
|
|
|
|
|
+ if (liveFlag == null || liveFlag != 1) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 查询该直播间的在线用户(liveFlag = 1, replayFlag = 0)
|
|
|
|
|
+ LiveWatchUser queryUser = new LiveWatchUser();
|
|
|
|
|
+ queryUser.setLiveId(liveId);
|
|
|
|
|
+ queryUser.setLiveFlag(1);
|
|
|
|
|
+ queryUser.setReplayFlag(0);
|
|
|
|
|
+ List<LiveWatchUser> onlineUsers = liveWatchUserService.selectAllWatchUser(queryUser);
|
|
|
|
|
+ if (onlineUsers == null || onlineUsers.isEmpty()) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 获取直播视频总时长
|
|
|
|
|
+ List<LiveVideo> videos = liveVideoService.listByLiveIdWithCache(liveId, 1);
|
|
|
|
|
+ long totalVideoDuration = 0L;
|
|
|
|
|
+ if (videos != null && !videos.isEmpty()) {
|
|
|
|
|
+ totalVideoDuration = videos.stream()
|
|
|
|
|
+ .filter(v -> v.getDuration() != null)
|
|
|
|
|
+ .mapToLong(LiveVideo::getDuration)
|
|
|
|
|
+ .sum();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理每个在线用户
|
|
|
|
|
+ List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
|
|
+ for (LiveWatchUser user : onlineUsers) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long userId = user.getUserId();
|
|
|
|
|
+ if (userId == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取用户的在线观看时长
|
|
|
|
|
+ String entryTimeKey = String.format(USER_ENTRY_TIME_KEY, liveId, userId);
|
|
|
|
|
+ Long existingEntryTime = redisCache.getCacheObject(entryTimeKey);
|
|
|
|
|
+ Long onlineSeconds = user.getOnlineSeconds() ==null ? 0L : user.getOnlineSeconds();
|
|
|
|
|
+ if(null != existingEntryTime){
|
|
|
|
|
+ onlineSeconds = onlineSeconds + ((System.currentTimeMillis() - existingEntryTime)/1000);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (onlineSeconds == null || onlineSeconds <= 0) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 获取用户的 companyId 和 companyUserId
|
|
|
|
|
+ LiveUserFirstEntry liveUserFirstEntry =
|
|
|
|
|
+ liveUserFirstEntryService.selectEntityByLiveIdUserIdWithCache(liveId, userId);
|
|
|
|
|
+ if (liveUserFirstEntry == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Long qwUserId = liveUserFirstEntry.getQwUserId();
|
|
|
|
|
+ Long externalContactId = liveUserFirstEntry.getExternalContactId();
|
|
|
|
|
+
|
|
|
|
|
+ if (qwUserId == null || qwUserId <= 0 || externalContactId == null || externalContactId <= 0) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ //更新最新用户活跃时间
|
|
|
|
|
+ String liveUserWatchLogKey = String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId,externalContactId,qwUserId);
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
|
|
+ redisCache.setCacheObject(liveUserWatchLogKey,formatter.format(now),5,TimeUnit.MINUTES);
|
|
|
|
|
+ // 使用 updateLiveWatchLogTypeByDuration 的逻辑更新观看记录状态
|
|
|
|
|
+ updateLiveWatchLogTypeByDuration(liveId, userId, qwUserId, externalContactId,
|
|
|
|
|
+ onlineSeconds, totalVideoDuration, updateLog);
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理用户观看记录状态异常: liveId={}, userId={}, error={}",
|
|
|
|
|
+ liveId, user.getUserId(), e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 批量插入回放用户数据
|
|
|
|
|
+ if (!updateLog.isEmpty()) {
|
|
|
|
|
+ int batchSize = 500;
|
|
|
|
|
+ for (int i = 0; i < updateLog.size(); i += batchSize) {
|
|
|
|
|
+ int end = Math.min(i + batchSize, updateLog.size());
|
|
|
|
|
+ List<LiveWatchLog> batch = updateLog.subList(i, end);
|
|
|
|
|
+ liveWatchLogService.batchUpdateLiveWatchLog(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ for (LiveWatchLog liveWatchLog : updateLog) {
|
|
|
|
|
+ redisCache.setCacheObject("live:watch:log:cache:" + liveWatchLog.getLogId(), liveWatchLog, 1, TimeUnit.HOURS);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理直播间观看记录状态异常: liveId={}, error={}",
|
|
|
|
|
+ live.getLiveId(), e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("实时扫描用户直播数据任务异常: error={}", e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 根据在线时长更新 LiveWatchLog 的 logType(复用 WebSocketServer 中的逻辑)
|
|
|
|
|
+ * @param liveId 直播间ID
|
|
|
|
|
+ * @param userId 用户ID
|
|
|
|
|
+ * @param qwUserId 邀请人id
|
|
|
|
|
+ * @param exId 外部人id
|
|
|
|
|
+ * @param onlineSeconds 在线时长(秒)
|
|
|
|
|
+ * @param totalVideoDuration 视频总时长(秒)
|
|
|
|
|
+ */
|
|
|
|
|
+ private void updateLiveWatchLogTypeByDuration(Long liveId, Long userId, Long qwUserId,
|
|
|
|
|
+ Long exId, Long onlineSeconds, long totalVideoDuration, List<LiveWatchLog> updateLog) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 查询 LiveWatchLog
|
|
|
|
|
+ LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
|
|
+ queryLog.setLiveId(liveId);
|
|
|
|
|
+ queryLog.setQwUserId(String.valueOf(qwUserId));
|
|
|
|
|
+ queryLog.setExternalContactId(exId);
|
|
|
|
|
+
|
|
|
|
|
+ List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogByLogIdWithCache(queryLog);
|
|
|
|
|
+ if (logs == null || logs.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Date now = new Date();
|
|
|
|
|
+ for (LiveWatchLog log : logs) {
|
|
|
|
|
+ if (log.getLogType() != null && log.getLogType() == 2) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ boolean needUpdate = false;
|
|
|
|
|
+ Integer newLogType = log.getLogType();
|
|
|
|
|
+
|
|
|
|
|
+ // ① 如果在线时长 <= 3分钟,修改 logType 为 4(看课中断) lmx-这个逻辑不合理,不能这样判定看课中断
|
|
|
|
|
+// if (onlineSeconds <= 180) { // 3分钟 = 180秒
|
|
|
|
|
+// newLogType = 4;
|
|
|
|
|
+// needUpdate = true;
|
|
|
|
|
+// } else
|
|
|
|
|
+ // ③ 如果直播视频 >= 40分钟,在线时长 >= 30分钟,logType 设置为 2(完课)
|
|
|
|
|
+ if (totalVideoDuration >= 2400 && onlineSeconds >= 1800) { // 40分钟 = 2400秒,30分钟 = 1800秒
|
|
|
|
|
+ newLogType = 2;
|
|
|
|
|
+ log.setFinishTime(now);
|
|
|
|
|
+ needUpdate = true;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 如果直播视频 >= 20分钟且 < 40分钟,在线时长 >= 20分钟,logType 设置为 2(完课)
|
|
|
|
|
+ else if (totalVideoDuration >= 1200 && totalVideoDuration < 2400 && onlineSeconds >= 1200) { // 20分钟 = 1200秒
|
|
|
|
|
+ newLogType = 2;
|
|
|
|
|
+ log.setFinishTime(now);
|
|
|
|
|
+ needUpdate = true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 如果 logType 已经是 2(完课),不再更新
|
|
|
|
|
+ if (needUpdate) {
|
|
|
|
|
+ log.setLogType(newLogType);
|
|
|
|
|
+ updateLog.add(log);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("根据在线时长更新 LiveWatchLog logType 异常:liveId={}, userId={}, error={}",
|
|
|
|
|
+ liveId, userId, e.getMessage(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 每分钟扫描一次用户在线状态用于更新用户观看记录值
|
|
|
|
|
+ */
|
|
|
|
|
+ @Scheduled(cron = "0 0/1 * * * ?")
|
|
|
|
|
+ @DistributeLock(key = "updateLiveWatchUserStatus", scene = "task")
|
|
|
|
|
+ public void updateLiveWatchUserStatus() {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Set<String> keys = redisCache.redisTemplate.keys("live:user:watch:log:*");
|
|
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
+ List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
|
|
+ if (keys != null && !keys.isEmpty()) {
|
|
|
|
|
+ for (String key : keys) {
|
|
|
|
|
+ String[] split = key.split(":");
|
|
|
|
|
+ String cacheTime = redisCache.getCacheObject(key);
|
|
|
|
|
+ //判断缓存的值是否已经距离现在超过一分钟
|
|
|
|
|
+ if (StringUtils.isNotBlank(cacheTime)) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ LocalDateTime cachedDateTime = LocalDateTime.parse(cacheTime, formatter);
|
|
|
|
|
+ // 比较时间,判断是否超过1分钟(60秒)
|
|
|
|
|
+ long secondsBetween = java.time.Duration.between(cachedDateTime, now).getSeconds();
|
|
|
|
|
+ if (secondsBetween >= 60) {
|
|
|
|
|
+ // 距离上次记录已超过1分钟,更新状态为看课中断
|
|
|
|
|
+ // 查询 LiveWatchLog
|
|
|
|
|
+ LiveWatchLog queryLog = new LiveWatchLog();
|
|
|
|
|
+ queryLog.setLiveId(Long.valueOf(split[4]));
|
|
|
|
|
+ queryLog.setQwUserId(String.valueOf(split[7]));
|
|
|
|
|
+ queryLog.setExternalContactId(Long.valueOf(split[6]));
|
|
|
|
|
+ queryLog.setLogType(1);
|
|
|
|
|
+ List<LiveWatchLog> logs = liveWatchLogService.selectLiveWatchLogList(queryLog);
|
|
|
|
|
+ if (logs != null && !logs.isEmpty()) {
|
|
|
|
|
+ for (LiveWatchLog log : logs) {
|
|
|
|
|
+ if (log.getLogType() != null && log.getLogType() == 2) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.setLogType(4);
|
|
|
|
|
+ updateLog.add(log);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解析缓存时间失败: cacheTime={}, error={}", cacheTime, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 批量插入回放用户数据
|
|
|
|
|
+ if (!updateLog.isEmpty()) {
|
|
|
|
|
+ int batchSize = 500;
|
|
|
|
|
+ for (int i = 0; i < updateLog.size(); i += batchSize) {
|
|
|
|
|
+ int end = Math.min(i + batchSize, updateLog.size());
|
|
|
|
|
+ List<LiveWatchLog> batch = updateLog.subList(i, end);
|
|
|
|
|
+ liveWatchLogService.batchUpdateLiveWatchLog(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ for (LiveWatchLog liveWatchLog : updateLog) {
|
|
|
|
|
+ redisCache.setCacheObject("live:watch:log:cache:" + liveWatchLog.getLogId(), liveWatchLog, 1, TimeUnit.HOURS);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
|
+ log.error("每分钟扫描一次用户在线状态用于更新用户观看记录值: error={}", ex.getMessage(), ex);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 批量同步Redis中的观看时长到数据库
|
|
|
|
|
+ * 每2分钟执行一次,减少数据库压力
|
|
|
|
|
+ */
|
|
|
|
|
+// @Scheduled(cron = "0 0/2 * * * ?")
|
|
|
|
|
+// @DistributeLock(key = "batchSyncWatchDuration", scene = "task")
|
|
|
|
|
+// public void batchSyncWatchDuration() {
|
|
|
|
|
+// try {
|
|
|
|
|
+// log.info("开始批量同步观看时长到数据库");
|
|
|
|
|
+//
|
|
|
|
|
+// // 优化:从所有直播间的Hash中批量获取数据
|
|
|
|
|
+// List<Live> activeLives = liveService.selectNoEndLiveList();
|
|
|
|
|
+//
|
|
|
|
|
+// if (activeLives == null || activeLives.isEmpty()) {
|
|
|
|
|
+// log.debug("当前没有活跃的直播间");
|
|
|
|
|
+// return;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// int totalCount = 0;
|
|
|
|
|
+// int successCount = 0;
|
|
|
|
|
+// int failCount = 0;
|
|
|
|
|
+//
|
|
|
|
|
+// // 逐个直播间处理
|
|
|
|
|
+// for (Live live : activeLives) {
|
|
|
|
|
+// try {
|
|
|
|
|
+// Long liveId = live.getLiveId();
|
|
|
|
|
+//
|
|
|
|
|
+// // 使用Hash结构存储每个直播间的观看时长
|
|
|
|
|
+// String hashKey = "live:watch:duration:hash:" + liveId;
|
|
|
|
|
+// Map<Object, Object> userDurations = redisCache.redisTemplate.opsForHash().entries(hashKey);
|
|
|
|
|
+//
|
|
|
|
|
+// if (userDurations == null || userDurations.isEmpty()) {
|
|
|
|
|
+// continue;
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// // 获取直播/回放标记(一次查询,所有用户复用)
|
|
|
|
|
+// Map<String, Integer> flagMap = liveWatchUserService.getLiveFlagWithCache(liveId);
|
|
|
|
|
+// Integer liveFlag = flagMap.get("liveFlag");
|
|
|
|
|
+// Integer replayFlag = flagMap.get("replayFlag");
|
|
|
|
|
+//
|
|
|
|
|
+// // 批量处理该直播间的所有用户
|
|
|
|
|
+// for (Map.Entry<Object, Object> entry : userDurations.entrySet()) {
|
|
|
|
|
+// try {
|
|
|
|
|
+// Long userId = Long.parseLong(entry.getKey().toString());
|
|
|
|
|
+// Long duration = Long.parseLong(entry.getValue().toString());
|
|
|
|
|
+//
|
|
|
|
|
+// totalCount++;
|
|
|
|
|
+//
|
|
|
|
|
+// // 异步更新数据库
|
|
|
|
|
+// liveWatchUserService.updateWatchDuration(liveId, userId, liveFlag, replayFlag, duration);
|
|
|
|
|
+// successCount++;
|
|
|
|
|
+//
|
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
|
+// failCount++;
|
|
|
|
|
+// log.error("同步用户观看时长失败: liveId={}, userId={}, error={}",
|
|
|
|
|
+// liveId, entry.getKey(), e.getMessage());
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
|
+// log.error("处理直播间观看时长失败: liveId={}, error={}", live.getLiveId(), e.getMessage());
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+//
|
|
|
|
|
+// log.info("批量同步观看时长完成: 总数={}, 成功={}, 失败={}", totalCount, successCount, failCount);
|
|
|
|
|
+//
|
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
|
+// log.error("批量同步观看时长任务异常", e);
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+}
|