|
@@ -0,0 +1,174 @@
|
|
|
|
|
+package com.fs.common.utils.redis;
|
|
|
|
|
+
|
|
|
|
|
+import com.fs.common.constant.CourseWatchKeysConstant;
|
|
|
|
|
+import com.fs.common.constant.LiveKeysConstant;
|
|
|
|
|
+import com.fs.common.core.redis.RedisCache;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
+import org.springframework.data.redis.connection.RedisConnection;
|
|
|
|
|
+import org.springframework.data.redis.core.Cursor;
|
|
|
|
|
+import org.springframework.data.redis.core.RedisCallback;
|
|
|
|
|
+import org.springframework.data.redis.core.ScanOptions;
|
|
|
|
|
+import org.springframework.data.redis.serializer.RedisSerializer;
|
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
|
+
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
|
+import java.util.HashSet;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Set;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 使用 SCAN 将 Redis 中真实存在的活跃 key 与 Set 索引对账(兜底,避免漏 track)。
|
|
|
|
|
+ */
|
|
|
|
|
+@Service
|
|
|
|
|
+public class RedisActiveKeyIndexRepairService {
|
|
|
|
|
+
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(RedisActiveKeyIndexRepairService.class);
|
|
|
|
|
+
|
|
|
|
|
+ private static final String REPAIR_LOCK_KEY = "redis:active_key_index:repair:lock";
|
|
|
|
|
+ private static final int SCAN_COUNT = 500;
|
|
|
|
|
+ private static final int LOCK_MINUTES = 45;
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private RedisCache redisCache;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 修补全部已注册的索引(多节点仅一个实例执行)。
|
|
|
|
|
+ */
|
|
|
|
|
+ public void repairAllIndexes() {
|
|
|
|
|
+ if (!tryAcquireRepairLock()) {
|
|
|
|
|
+ log.info("Redis 活跃 key 索引修补跳过:其他节点正在执行");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ log.info("Redis 活跃 key 索引修补开始");
|
|
|
|
|
+ List<RepairResult> results = new ArrayList<>();
|
|
|
|
|
+ for (IndexRepairTarget target : repairTargets()) {
|
|
|
|
|
+ results.add(repairIndex(target));
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("Redis 活跃 key 索引修补完成: {}", results);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("Redis 活跃 key 索引修补失败", e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ redisCache.deleteObject(REPAIR_LOCK_KEY);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public RepairResult repairIndex(IndexRepairTarget target) {
|
|
|
|
|
+ Set<String> scannedKeys = scanKeys(target.scanPattern);
|
|
|
|
|
+ int added = 0;
|
|
|
|
|
+ for (String cacheKey : scannedKeys) {
|
|
|
|
|
+ if (Boolean.TRUE.equals(redisCache.redisTemplate.hasKey(cacheKey))) {
|
|
|
|
|
+ Long addedCount = redisCache.redisTemplate.opsForSet().add(target.indexKey, cacheKey);
|
|
|
|
|
+ if (addedCount != null && addedCount > 0) {
|
|
|
|
|
+ added++;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ int removed = 0;
|
|
|
|
|
+ Set<Object> members = redisCache.redisTemplate.opsForSet().members(target.indexKey);
|
|
|
|
|
+ if (members != null) {
|
|
|
|
|
+ for (Object member : members) {
|
|
|
|
|
+ String cacheKey = member.toString();
|
|
|
|
|
+ if (!Boolean.TRUE.equals(redisCache.redisTemplate.hasKey(cacheKey))) {
|
|
|
|
|
+ redisCache.redisTemplate.opsForSet().remove(target.indexKey, cacheKey);
|
|
|
|
|
+ removed++;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return new RepairResult(target.name, target.scanPattern, target.indexKey, scannedKeys.size(), added, removed);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private List<IndexRepairTarget> repairTargets() {
|
|
|
|
|
+ List<IndexRepairTarget> targets = new ArrayList<>();
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveAutoTask",
|
|
|
|
|
+ LiveKeysConstant.LIVE_AUTO_TASK_PREFIX + "*",
|
|
|
|
|
+ LiveKeysConstant.LIVE_AUTO_TASK_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveLotteryTask",
|
|
|
|
|
+ LiveKeysConstant.LIVE_LOTTERY_TASK_PREFIX + "*",
|
|
|
|
|
+ LiveKeysConstant.LIVE_LOTTERY_TASK_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveRedTask",
|
|
|
|
|
+ LiveKeysConstant.LIVE_RED_TASK_PREFIX + "*",
|
|
|
|
|
+ LiveKeysConstant.LIVE_RED_TASK_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveTagMark",
|
|
|
|
|
+ LiveKeysConstant.LIVE_TAG_MARK_CACHE.replace("%s", "*"),
|
|
|
|
|
+ LiveKeysConstant.LIVE_TAG_MARK_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveCouponNum",
|
|
|
|
|
+ LiveKeysConstant.LIVE_COUPON_NUM_PREFIX + "*",
|
|
|
|
|
+ LiveKeysConstant.LIVE_COUPON_NUM_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("liveUserWatchLog",
|
|
|
|
|
+ "live:user:watch:log:*",
|
|
|
|
|
+ LiveKeysConstant.LIVE_USER_WATCH_LOG_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("h5wxHeartbeat",
|
|
|
|
|
+ CourseWatchKeysConstant.H5_WX_HEARTBEAT_PREFIX + "*",
|
|
|
|
|
+ CourseWatchKeysConstant.H5_WX_HEARTBEAT_INDEX));
|
|
|
|
|
+ targets.add(new IndexRepairTarget("h5wxDuration",
|
|
|
|
|
+ CourseWatchKeysConstant.H5_WX_DURATION_PREFIX + "*",
|
|
|
|
|
+ CourseWatchKeysConstant.H5_WX_DURATION_INDEX));
|
|
|
|
|
+ return targets;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private Set<String> scanKeys(String pattern) {
|
|
|
|
|
+ Set<String> keys = new HashSet<>();
|
|
|
|
|
+ ScanOptions options = ScanOptions.scanOptions().match(pattern).count(SCAN_COUNT).build();
|
|
|
|
|
+ redisCache.redisTemplate.execute((RedisCallback<Void>) connection -> {
|
|
|
|
|
+ RedisSerializer<String> keySerializer = redisCache.redisTemplate.getStringSerializer();
|
|
|
|
|
+ try (Cursor<byte[]> cursor = connection.scan(options)) {
|
|
|
|
|
+ while (cursor.hasNext()) {
|
|
|
|
|
+ String key = keySerializer.deserialize(cursor.next());
|
|
|
|
|
+ if (key != null) {
|
|
|
|
|
+ keys.add(key);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
|
+ throw new RuntimeException("Redis SCAN failed, pattern=" + pattern, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ return null;
|
|
|
|
|
+ });
|
|
|
|
|
+ return keys;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private boolean tryAcquireRepairLock() {
|
|
|
|
|
+ return redisCache.setIfAbsent(REPAIR_LOCK_KEY, "1", LOCK_MINUTES, TimeUnit.MINUTES);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static final class IndexRepairTarget {
|
|
|
|
|
+ private final String name;
|
|
|
|
|
+ private final String scanPattern;
|
|
|
|
|
+ private final String indexKey;
|
|
|
|
|
+
|
|
|
|
|
+ public IndexRepairTarget(String name, String scanPattern, String indexKey) {
|
|
|
|
|
+ this.name = name;
|
|
|
|
|
+ this.scanPattern = scanPattern;
|
|
|
|
|
+ this.indexKey = indexKey;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static final class RepairResult {
|
|
|
|
|
+ private final String name;
|
|
|
|
|
+ private final String scanPattern;
|
|
|
|
|
+ private final String indexKey;
|
|
|
|
|
+ private final int scannedCount;
|
|
|
|
|
+ private final int addedToIndex;
|
|
|
|
|
+ private final int removedFromIndex;
|
|
|
|
|
+
|
|
|
|
|
+ public RepairResult(String name, String scanPattern, String indexKey,
|
|
|
|
|
+ int scannedCount, int addedToIndex, int removedFromIndex) {
|
|
|
|
|
+ this.name = name;
|
|
|
|
|
+ this.scanPattern = scanPattern;
|
|
|
|
|
+ this.indexKey = indexKey;
|
|
|
|
|
+ this.scannedCount = scannedCount;
|
|
|
|
|
+ this.addedToIndex = addedToIndex;
|
|
|
|
|
+ this.removedFromIndex = removedFromIndex;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public String toString() {
|
|
|
|
|
+ return name + "{scanned=" + scannedCount + ", added=" + addedToIndex + ", removed=" + removedFromIndex + "}";
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|