|
|
@@ -8,6 +8,8 @@ import com.fs.common.constant.LiveKeysConstant;
|
|
|
import com.fs.common.core.domain.R;
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
+import com.fs.common.utils.redis.LiveDelayedTaskRedisUtil;
|
|
|
+import com.fs.common.utils.redis.RedisActiveKeyIndexRepairService;
|
|
|
import com.fs.framework.aspectj.lock.DistributeLock;
|
|
|
import com.fs.erp.service.FsJstAftersalePushService;
|
|
|
import com.fs.his.service.IFsUserService;
|
|
|
@@ -39,7 +41,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
-import static com.fs.common.constant.LiveKeysConstant.LIVE_COUPON_NUM;
|
|
|
import static com.fs.live.websocket.service.WebSocketServer.USER_ENTRY_TIME_KEY;
|
|
|
|
|
|
@Component
|
|
|
@@ -82,6 +83,10 @@ public class Task {
|
|
|
|
|
|
@Autowired
|
|
|
public FsJstAftersalePushService fsJstAftersalePushService;
|
|
|
+ @Autowired
|
|
|
+ private LiveDelayedTaskRedisUtil liveDelayedTaskRedisUtil;
|
|
|
+ @Autowired
|
|
|
+ private RedisActiveKeyIndexRepairService redisActiveKeyIndexRepairService;
|
|
|
|
|
|
@Scheduled(cron = "0 0/1 * * * ?")
|
|
|
@DistributeLock(key = "updateLiveStatusByTime", scene = "task")
|
|
|
@@ -155,7 +160,6 @@ public class Task {
|
|
|
liveService.updateLiveEntity(live);
|
|
|
}
|
|
|
}
|
|
|
- String key = "live:auto_task:";
|
|
|
if (!startLiveList.isEmpty()) {
|
|
|
for (Live live : startLiveList) {
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
@@ -164,12 +168,14 @@ public class Task {
|
|
|
webSocketServer.broadcastMessage(live.getLiveId(), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
|
|
|
List<LiveAutoTask> collect = liveAutoTasks.stream().filter(liveAutoTask -> liveAutoTask.getLiveId().equals(live.getLiveId())).collect(Collectors.toList());
|
|
|
if (!collect.isEmpty()) {
|
|
|
+ String autoTaskKey = liveDelayedTaskRedisUtil.autoTaskKey(live.getLiveId());
|
|
|
collect.forEach(liveAutoTask -> {
|
|
|
liveAutoTask.setCreateTime(null);
|
|
|
liveAutoTask.setUpdateTime(null);
|
|
|
- redisCache.zSetAdd(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
|
|
|
- redisCache.expire(key+live.getLiveId(), 1, TimeUnit.DAYS);
|
|
|
+ redisCache.zSetAdd(autoTaskKey, JSON.toJSONString(liveAutoTask), liveAutoTask.getAbsValue().getTime());
|
|
|
+ redisCache.expire(autoTaskKey, 1, TimeUnit.DAYS);
|
|
|
});
|
|
|
+ liveDelayedTaskRedisUtil.trackAutoTask(live.getLiveId());
|
|
|
}
|
|
|
// 清理小程序缓存 和 直播标签缓存
|
|
|
String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
|
|
|
@@ -194,8 +200,9 @@ public class Task {
|
|
|
tagMarkInfo.put("startTime", live.getStartTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli());
|
|
|
tagMarkInfo.put("videoDuration", videoDuration);
|
|
|
|
|
|
- String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
|
|
|
+ String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(live.getLiveId());
|
|
|
redisCache.setCacheObject(tagMarkKey, JSON.toJSONString(tagMarkInfo), 24, TimeUnit.HOURS);
|
|
|
+ liveDelayedTaskRedisUtil.trackTagMark(live.getLiveId());
|
|
|
log.info("直播间开启,已加入打标签缓存: liveId={}, startTime={}, videoDuration={}",
|
|
|
live.getLiveId(), live.getStartTime(), videoDuration);
|
|
|
}
|
|
|
@@ -214,12 +221,9 @@ public class Task {
|
|
|
webSocketServer.broadcastMessage(live.getLiveId(), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
|
|
|
List<LiveAutoTask> collect = liveAutoTasks.stream().filter(liveAutoTask -> liveAutoTask.getLiveId().equals(live.getLiveId())).collect(Collectors.toList());
|
|
|
if (!collect.isEmpty()) {
|
|
|
- redisCache.deleteObject(key + live.getLiveId());
|
|
|
- collect.forEach(liveAutoTask -> {
|
|
|
- liveAutoTask.setCreateTime(null);
|
|
|
- liveAutoTask.setUpdateTime(null);
|
|
|
- redisCache.redisTemplate.opsForZSet().remove(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
|
|
|
- });
|
|
|
+ String autoTaskKey = liveDelayedTaskRedisUtil.autoTaskKey(live.getLiveId());
|
|
|
+ redisCache.deleteObject(autoTaskKey);
|
|
|
+ liveDelayedTaskRedisUtil.untrackAutoTask(live.getLiveId());
|
|
|
}
|
|
|
String cacheKey = String.format(LiveKeysConstant.LIVE_DATA_CACHE, live.getLiveId());
|
|
|
redisCache.deleteObject(cacheKey);
|
|
|
@@ -227,8 +231,9 @@ public class Task {
|
|
|
|
|
|
// 删除打标签缓存
|
|
|
try {
|
|
|
- String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, live.getLiveId());
|
|
|
+ String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(live.getLiveId());
|
|
|
redisCache.deleteObject(tagMarkKey);
|
|
|
+ liveDelayedTaskRedisUtil.untrackTagMark(live.getLiveId());
|
|
|
log.info("直播间结束,已删除打标签缓存: liveId={}", live.getLiveId());
|
|
|
} catch (Exception e) {
|
|
|
log.error("删除直播间打标签缓存失败: liveId={}, error={}", live.getLiveId(), e.getMessage(), e);
|
|
|
@@ -243,42 +248,41 @@ public class Task {
|
|
|
@DistributeLock(key = "liveLotteryTask", scene = "task")
|
|
|
public void liveLotteryTask() {
|
|
|
long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
- String lotteryKey = "live:lottery_task:*";
|
|
|
- Set<String> allLiveKeys = redisCache.redisTemplate.keys(lotteryKey);
|
|
|
- if (allLiveKeys != null && !allLiveKeys.isEmpty()) {
|
|
|
- for (String liveKey : allLiveKeys) {
|
|
|
- Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
- if (range == null || range.isEmpty()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- processLotteryTask(range);
|
|
|
- redisCache.redisTemplate.opsForZSet()
|
|
|
- .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ Set<String> lotteryZSetKeys = liveDelayedTaskRedisUtil.listLotteryTaskZSetKeys();
|
|
|
+ for (String liveKey : lotteryZSetKeys) {
|
|
|
+ Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
+ if (range == null || range.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ processLotteryTask(range);
|
|
|
+ redisCache.redisTemplate.opsForZSet().removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ Long liveId = LiveKeysConstant.parseLiveIdFromTaskZSetKey(liveKey);
|
|
|
+ if (liveId != null) {
|
|
|
+ liveDelayedTaskRedisUtil.refreshLotteryTaskIndex(liveId);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String redKey = "live:red_task:*";
|
|
|
- allLiveKeys = redisCache.redisTemplate.keys(redKey);
|
|
|
- if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- for (String liveKey : allLiveKeys) {
|
|
|
+ Set<String> redZSetKeys = liveDelayedTaskRedisUtil.listRedTaskZSetKeys();
|
|
|
+ for (String liveKey : redZSetKeys) {
|
|
|
Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
if (range == null || range.isEmpty()) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
updateRedStatus(range);
|
|
|
- redisCache.redisTemplate.opsForZSet()
|
|
|
- .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ redisCache.redisTemplate.opsForZSet().removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ Long liveId = LiveKeysConstant.parseLiveIdFromTaskZSetKey(liveKey);
|
|
|
+ if (liveId != null) {
|
|
|
+ liveDelayedTaskRedisUtil.refreshRedTaskIndex(liveId);
|
|
|
+ }
|
|
|
try {
|
|
|
// 广播红包关闭消息
|
|
|
SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
- sendMsgVo.setLiveId(Long.valueOf(liveKey));
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
sendMsgVo.setCmd("red");
|
|
|
sendMsgVo.setStatus(-1);
|
|
|
- liveService.asyncToCacheLiveConfig(Long.parseLong(liveKey));
|
|
|
- webSocketServer.broadcastMessage(Long.valueOf(liveKey), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ liveService.asyncToCacheLiveConfig(liveId);
|
|
|
+ webSocketServer.broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
} catch (Exception e) {
|
|
|
log.error("更新红包状态异常", e);
|
|
|
}
|
|
|
@@ -374,21 +378,18 @@ public class Task {
|
|
|
public void liveAutoTask() {
|
|
|
long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
|
|
|
|
|
|
- Set<String> allLiveKeys = redisCache.redisTemplate.keys("live:auto_task:*");
|
|
|
- if (allLiveKeys == null || allLiveKeys.isEmpty()) {
|
|
|
- return; // 没有数据,直接返回
|
|
|
- }
|
|
|
- // 2. 遍历每个直播间的ZSet键
|
|
|
- for (String liveKey : allLiveKeys) {
|
|
|
- // 3. 获取当前直播间ZSet中所有元素(按score排序)
|
|
|
- // range方法:0表示第一个元素,-1表示最后一个元素,即获取全部
|
|
|
+ Set<String> autoZSetKeys = liveDelayedTaskRedisUtil.listAutoTaskZSetKeys();
|
|
|
+ for (String liveKey : autoZSetKeys) {
|
|
|
Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
|
|
|
if (range == null || range.isEmpty()) {
|
|
|
- continue; // 没有数据,直接返回
|
|
|
+ continue;
|
|
|
}
|
|
|
- redisCache.redisTemplate.opsForZSet()
|
|
|
- .removeRangeByScore(liveKey, 0, currentTime);
|
|
|
+ redisCache.redisTemplate.opsForZSet().removeRangeByScore(liveKey, 0, currentTime);
|
|
|
processAutoTask(range);
|
|
|
+ Long liveId = LiveKeysConstant.parseLiveIdFromTaskZSetKey(liveKey);
|
|
|
+ if (liveId != null) {
|
|
|
+ liveDelayedTaskRedisUtil.refreshAutoTaskIndex(liveId);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -611,17 +612,23 @@ public class Task {
|
|
|
}
|
|
|
/*// 更新数据库
|
|
|
liveDataService.updateLiveData(liveData);*/
|
|
|
- Set<String> keys = redisCache.redisTemplate.keys(String.format(LIVE_COUPON_NUM, "*"));
|
|
|
- if (keys != null && !keys.isEmpty()) {
|
|
|
- for (String key : keys) {
|
|
|
- Object o = redisCache.redisTemplate.opsForValue().get(String.format(LIVE_COUPON_NUM, key));
|
|
|
- if (o != null) {
|
|
|
- LiveCouponIssue updateEntity = new LiveCouponIssue();
|
|
|
- updateEntity.setId(Long.valueOf(key));
|
|
|
- updateEntity.setRemainCount(Long.parseLong(o.toString()));
|
|
|
- liveCouponIssueService.updateLiveCouponIssue(updateEntity);
|
|
|
+ for (String key : liveDelayedTaskRedisUtil.listCouponNumKeys()) {
|
|
|
+ Object o = redisCache.getCacheObject(key);
|
|
|
+ if (o == null) {
|
|
|
+ Long issueId = LiveKeysConstant.parseCouponIssueIdFromKey(key);
|
|
|
+ if (issueId != null) {
|
|
|
+ liveDelayedTaskRedisUtil.untrackCouponNum(issueId);
|
|
|
}
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Long issueId = LiveKeysConstant.parseCouponIssueIdFromKey(key);
|
|
|
+ if (issueId == null) {
|
|
|
+ continue;
|
|
|
}
|
|
|
+ LiveCouponIssue updateEntity = new LiveCouponIssue();
|
|
|
+ updateEntity.setId(issueId);
|
|
|
+ updateEntity.setRemainCount(Long.parseLong(o.toString()));
|
|
|
+ liveCouponIssueService.updateLiveCouponIssue(updateEntity);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -643,11 +650,8 @@ public class Task {
|
|
|
public void scanLiveTagMark() {
|
|
|
try {
|
|
|
|
|
|
- // 获取所有打标签缓存的key
|
|
|
- String pattern = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, "*");
|
|
|
- Set<String> keys = redisCache.redisTemplate.keys(pattern);
|
|
|
-
|
|
|
- if (keys == null || keys.isEmpty()) {
|
|
|
+ Set<String> keys = liveDelayedTaskRedisUtil.listTagMarkKeys();
|
|
|
+ if (keys.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -660,6 +664,10 @@ public class Task {
|
|
|
// 从Redis获取直播间信息
|
|
|
Object cacheValue = redisCache.getCacheObject(key);
|
|
|
if (cacheValue == null) {
|
|
|
+ Long staleLiveId = LiveKeysConstant.parseLiveIdFromTaskZSetKey(key);
|
|
|
+ if (staleLiveId != null) {
|
|
|
+ liveDelayedTaskRedisUtil.untrackTagMark(staleLiveId);
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
@@ -795,8 +803,9 @@ public class Task {
|
|
|
// 删除已处理的直播间缓存
|
|
|
for (Long liveId : processedLiveIds) {
|
|
|
try {
|
|
|
- String tagMarkKey = String.format(LiveKeysConstant.LIVE_TAG_MARK_CACHE, liveId);
|
|
|
+ String tagMarkKey = liveDelayedTaskRedisUtil.tagMarkKey(liveId);
|
|
|
redisCache.deleteObject(tagMarkKey);
|
|
|
+ liveDelayedTaskRedisUtil.untrackTagMark(liveId);
|
|
|
} catch (Exception e) {
|
|
|
log.error("删除直播间打标签缓存失败: liveId={}, error={}", liveId, e.getMessage(), e);
|
|
|
}
|
|
|
@@ -886,9 +895,10 @@ public class Task {
|
|
|
continue;
|
|
|
}
|
|
|
//更新最新用户活跃时间
|
|
|
- String liveUserWatchLogKey = String.format(LIVE_USER_WATCH_LOG_CACHE, liveId, userId,externalContactId,qwUserId);
|
|
|
+ String liveUserWatchLogKey = LiveKeysConstant.liveUserWatchLogKey(liveId, userId, externalContactId, qwUserId);
|
|
|
LocalDateTime now = LocalDateTime.now();
|
|
|
- redisCache.setCacheObject(liveUserWatchLogKey,formatter.format(now),5,TimeUnit.MINUTES);
|
|
|
+ redisCache.setCacheObject(liveUserWatchLogKey, formatter.format(now), 5, TimeUnit.MINUTES);
|
|
|
+ liveDelayedTaskRedisUtil.trackUserWatchLog(liveUserWatchLogKey);
|
|
|
// 使用 updateLiveWatchLogTypeByDuration 的逻辑更新观看记录状态
|
|
|
updateLiveWatchLogTypeByDuration(liveId, userId, qwUserId, externalContactId,
|
|
|
onlineSeconds, totalVideoDuration, updateLog);
|
|
|
@@ -989,14 +999,18 @@ public class Task {
|
|
|
@DistributeLock(key = "updateLiveWatchUserStatus", scene = "task")
|
|
|
public void updateLiveWatchUserStatus() {
|
|
|
try {
|
|
|
- Set<String> keys = redisCache.redisTemplate.keys("live:user:watch:log:*");
|
|
|
+ Set<String> keys = liveDelayedTaskRedisUtil.listUserWatchLogKeys();
|
|
|
LocalDateTime now = LocalDateTime.now();
|
|
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
List<LiveWatchLog> updateLog = new ArrayList<>();
|
|
|
- if (keys != null && !keys.isEmpty()) {
|
|
|
+ if (!keys.isEmpty()) {
|
|
|
for (String key : keys) {
|
|
|
String[] split = key.split(":");
|
|
|
String cacheTime = redisCache.getCacheObject(key);
|
|
|
+ if (StringUtils.isBlank(cacheTime)) {
|
|
|
+ liveDelayedTaskRedisUtil.untrackUserWatchLog(key);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
//判断缓存的值是否已经距离现在超过一分钟
|
|
|
if (StringUtils.isNotBlank(cacheTime)) {
|
|
|
try {
|
|
|
@@ -1115,4 +1129,13 @@ public class Task {
|
|
|
// log.error("批量同步观看时长任务异常", e);
|
|
|
// }
|
|
|
// }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 每天凌晨 1 点:SCAN 修补活跃 key 索引(多节点由 Redis 锁保证只执行一次)
|
|
|
+ */
|
|
|
+ @Scheduled(cron = "0 0 1 * * ?")
|
|
|
+ @DistributeLock(key = "repairRedisActiveKeyIndexes", scene = "task")
|
|
|
+ public void repairRedisActiveKeyIndexes() {
|
|
|
+ redisActiveKeyIndexRepairService.repairAllIndexes();
|
|
|
+ }
|
|
|
}
|