Parcourir la source

缓存添加修改

yuhongqi il y a 1 jour
Parent
commit
770a0b3209

+ 6 - 0
fs-common/src/main/java/com/fs/common/constant/LiveKeysConstant.java

@@ -55,4 +55,10 @@ public class LiveKeysConstant {
     public static final int TTL_LIVE_MSG = 30;
     public static final int TTL_LIVE_DATA = 30;
 
+    /** 商品足迹异步写入队列 */
+    public static final String FOOTPRINT_QUEUE_KEY = "live:footprint:queue";
+    /** 足迹任务处理锁前缀,后缀为 taskId */
+    public static final String FOOTPRINT_PROCESSING_KEY = "live:footprint:processing:";
+    public static final int FOOTPRINT_PROCESSING_LOCK_EXPIRE = 300;
+
 }

+ 5 - 3
fs-live-ws/src/main/java/com/fs/live/task/Task.java

@@ -258,6 +258,7 @@ public class Task {
         }
 
         String redKey = "live:red_task:*";
+        final String redTaskPrefix = "live:red_task:";
         allLiveKeys = redisCache.redisTemplate.keys(redKey);
         if (allLiveKeys == null || allLiveKeys.isEmpty()) {
             return;
@@ -273,12 +274,13 @@ public class Task {
                     .removeRangeByScore(liveKey, 0, currentTime);
             try {
                 // 广播红包关闭消息
+                Long liveId = Long.valueOf(liveKey.substring(redTaskPrefix.length()));
                 WsSendMsgVo sendMsgVo = new WsSendMsgVo();
-                sendMsgVo.setLiveId(Long.valueOf(liveKey));
+                sendMsgVo.setLiveId(liveId);
                 sendMsgVo.setCmd("red");
                 sendMsgVo.setStatus(-1);
-                liveService.asyncToCacheLiveConfig(Long.parseLong(liveKey));
-                liveWsRoomBroadcastFacade.broadcastMessage(Long.valueOf(liveKey), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+                liveService.asyncToCacheLiveConfig(liveId);
+                liveWsRoomBroadcastFacade.broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
             } catch (Exception e) {
                 log.error("更新红包状态异常", e);
             }

+ 2 - 0
fs-live-ws/src/main/java/com/fs/live/ws/constant/WsAttrConstant.java

@@ -11,6 +11,8 @@ public final class WsAttrConstant {
     public static final String USER_ID = "userId";
     public static final String USER_TYPE = "userType";
     public static final String TOKEN = "APPToken";
+    public static final String TIMESTAMP = "timestamp";
+    public static final String SIGNATURE = "signature";
     public static final String LOCATION = "location";
     public static final String COMPANY_ID = "companyId";
     public static final String COMPANY_USER_ID = "companyUserId";

+ 36 - 14
fs-live-ws/src/main/java/com/fs/live/ws/handler/WsAuthHandler.java

@@ -2,6 +2,7 @@ package com.fs.live.ws.handler;
 
 import com.fs.live.ws.constant.WsAttrConstant;
 import com.fs.live.ws.util.WsJwtUtils;
+import com.fs.live.ws.util.WsVerifyUtils;
 import io.jsonwebtoken.Claims;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFutureListener;
@@ -48,24 +49,45 @@ public class WsAuthHandler extends ChannelInboundHandlerAdapter {
             return;
         }
 
-        String token = resolveToken(params);
-        if (token == null) {
-            reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "缺少 token");
-            return;
-        }
-
         Long liveId = Long.valueOf(params.get(WsAttrConstant.LIVE_ID).get(0));
         Long userId = Long.valueOf(params.get(WsAttrConstant.USER_ID).get(0));
-        Claims claims = wsJwtUtils.getClaimByToken(token);
-        if (claims == null || wsJwtUtils.isTokenExpired(claims.getExpiration())) {
-            log.warn("WS 鉴权失败 liveId={}, userId={}", liveId, userId);
-            reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "token 无效");
-            return;
-        }
+
+        String token = resolveToken(params);
+        boolean hasSignature = params.containsKey(WsAttrConstant.SIGNATURE)
+                && params.containsKey(WsAttrConstant.TIMESTAMP)
+                && params.containsKey(WsAttrConstant.USER_TYPE);
 
         long userType = 0L;
-        if (params.containsKey(WsAttrConstant.USER_TYPE)) {
-            userType = Long.parseLong(params.get(WsAttrConstant.USER_TYPE).get(0));
+        if (token != null) {
+            Claims claims = wsJwtUtils.getClaimByToken(token);
+            if (claims == null || wsJwtUtils.isTokenExpired(claims.getExpiration())) {
+                log.warn("WS 鉴权失败 liveId={}, userId={}", liveId, userId);
+                reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "token 无效");
+                return;
+            }
+            if (params.containsKey(WsAttrConstant.USER_TYPE)) {
+                userType = Long.parseLong(params.get(WsAttrConstant.USER_TYPE).get(0));
+            }
+        } else if (hasSignature) {
+            try {
+                String userTypeStr = params.get(WsAttrConstant.USER_TYPE).get(0);
+                String timestampStr = params.get(WsAttrConstant.TIMESTAMP).get(0);
+                String signatureStr = params.get(WsAttrConstant.SIGNATURE).get(0);
+                if (!WsVerifyUtils.verifySignature(liveId.toString(), userId.toString(),
+                        userTypeStr, timestampStr, signatureStr)) {
+                    log.warn("WS 签名鉴权失败 liveId={}, userId={}", liveId, userId);
+                    reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "signature 无效");
+                    return;
+                }
+                userType = Long.parseLong(userTypeStr);
+            } catch (Exception ex) {
+                log.warn("WS 签名鉴权异常 liveId={}, userId={}", liveId, userId, ex);
+                reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "signature 无效");
+                return;
+            }
+        } else {
+            reject(ctx, req, HttpResponseStatus.UNAUTHORIZED, "缺少 token 或 signature");
+            return;
         }
 
         ctx.channel().attr(WsAttrConstant.ATTR_LIVE_ID).set(liveId);

+ 7 - 2
fs-live-ws/src/main/java/com/fs/live/ws/service/impl/LiveWsBroadcastServiceImpl.java

@@ -21,13 +21,18 @@ public class LiveWsBroadcastServiceImpl implements ILiveWsBroadcastService {
     @Autowired
     private StringRedisTemplate stringRedisTemplate;
 
+    /**
+     * 向直播间所有连接广播(含本节点与其它节点)。
+     * 仅走 Redis Pub/Sub,由 {@link com.fs.live.ws.listener.LiveWsRoomMessageListener} 在各节点 broadcastLocal,
+     * 避免本节点「先 local 再 Redis 回调 local」导致客户端收到重复推送。
+     */
     @Override
     public void broadcastToRoom(Long liveId, String message) {
-        broadcastLocal(liveId, message);
         try {
             stringRedisTemplate.convertAndSend(String.format(WsRedisKeys.WS_ROOM_CHANNEL, liveId), message);
         } catch (Exception ex) {
-            log.warn("Redis 广播失败 liveId={}", liveId, ex);
+            log.warn("Redis 广播失败 liveId={},降级为本节点广播", liveId, ex);
+            broadcastLocal(liveId, message);
         }
     }
 

+ 38 - 0
fs-live-ws/src/main/java/com/fs/live/ws/util/WsVerifyUtils.java

@@ -0,0 +1,38 @@
+package com.fs.live.ws.util;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 管理端 WebSocket 握手签名(与 adminUI liveWS.js HMAC 规则一致)
+ */
+public final class WsVerifyUtils {
+
+    private WsVerifyUtils() {
+    }
+
+    public static boolean verifySignature(String liveId, String userId, String userType,
+                                          String timestamp, String signature) throws Exception {
+        String expected = generateSignature(liveId, userId, userType, timestamp);
+        return expected.equals(signature);
+    }
+
+    private static String generateSignature(String liveId, String userId, String userType, String secret)
+            throws Exception {
+        String data = liveId + userId + userType + secret;
+        Mac mac = Mac.getInstance("HmacSHA256");
+        SecretKeySpec keySpec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
+        mac.init(keySpec);
+        byte[] rawHmac = mac.doFinal(data.getBytes(StandardCharsets.UTF_8));
+        return bytesToHex(rawHmac);
+    }
+
+    private static String bytesToHex(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            sb.append(String.format("%02x", b));
+        }
+        return sb.toString();
+    }
+}

+ 3 - 28
fs-user-app/src/main/java/com/fs/app/controller/cacheLive/CacheLiveGoodsController.java

@@ -1,18 +1,17 @@
 package com.fs.app.controller.cacheLive;
 
+import com.fs.app.queue.footprint.FootprintQueueService;
 import com.fs.common.constant.LiveKeysConstant;
 import com.fs.common.core.domain.AjaxResult;
 import com.fs.common.core.domain.R;
 import com.fs.common.utils.ServletUtils;
 import com.fs.hisStore.domain.FsStoreProductAttrScrm;
 import com.fs.hisStore.domain.FsStoreProductAttrValueScrm;
-import com.fs.hisStore.domain.FsStoreProductRelationScrm;
 import com.fs.hisStore.domain.FsStoreProductScrm;
 import com.fs.hisStore.domain.FsStoreProductPurchaseLimitScrm;
 import com.fs.hisStore.service.IFsStoreProductAttrScrmService;
 import com.fs.hisStore.service.IFsStoreProductAttrValueScrmService;
 import com.fs.hisStore.service.IFsStoreProductPurchaseLimitScrmService;
-import com.fs.hisStore.service.IFsStoreProductRelationScrmService;
 import com.fs.hisStore.service.IFsStoreProductScrmService;
 import com.fs.live.domain.LiveGoods;
 import com.fs.live.service.ILiveGoodsService;
@@ -25,7 +24,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +42,7 @@ public class CacheLiveGoodsController extends CacheLiveBaseController {
     @Autowired
     private IFsStoreProductAttrValueScrmService attrValueService;
     @Autowired
-    private IFsStoreProductRelationScrmService productRelationService;
+    private FootprintQueueService footprintQueueService;
     @Autowired
     private IFsStoreProductPurchaseLimitScrmService purchaseLimitService;
 
@@ -101,7 +99,7 @@ public class CacheLiveGoodsController extends CacheLiveBaseController {
 
         String userId = getUserId();
         if (userId != null) {
-            saveFootprint(userId, product.getProductId());
+            footprintQueueService.enqueue(Long.parseLong(userId), product.getProductId());
         }
 
         Integer remainingPurchaseLimit = null;
@@ -144,27 +142,4 @@ public class CacheLiveGoodsController extends CacheLiveBaseController {
                 () -> liveGoodsService.selectLiveGoodsByGoodsId(goodsId));
         return AjaxResult.success(goods);
     }
-
-    private void saveFootprint(String userId, Long productId) {
-        FsStoreProductRelationScrm query = new FsStoreProductRelationScrm();
-        query.setIsDel(0);
-        query.setUserId(Long.parseLong(userId));
-        query.setProductId(productId);
-        query.setType("foot");
-        List<FsStoreProductRelationScrm> relations = productRelationService.selectFsStoreProductRelationList(query);
-        if (relations != null && !relations.isEmpty()) {
-            FsStoreProductRelationScrm relation = relations.get(0);
-            relation.setUpdateTime(new Date());
-            productRelationService.updateFsStoreProductRelation(relation);
-        } else {
-            FsStoreProductRelationScrm relation = new FsStoreProductRelationScrm();
-            relation.setUserId(Long.parseLong(userId));
-            relation.setIsDel(0);
-            relation.setProductId(productId);
-            relation.setType("foot");
-            relation.setCreateTime(new Date());
-            relation.setUpdateTime(new Date());
-            productRelationService.insertFsStoreProductRelation(relation);
-        }
-    }
 }

+ 5 - 25
fs-user-app/src/main/java/com/fs/app/controller/live/LiveGoodsController.java

@@ -1,5 +1,6 @@
 package com.fs.app.controller.live;
 
+import com.fs.app.queue.footprint.FootprintQueueService;
 import com.fs.app.controller.AppBaseController;
 import com.fs.common.annotation.Log;
 import com.fs.common.constant.LiveKeysConstant;
@@ -19,7 +20,6 @@ import com.github.pagehelper.PageInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,8 +45,9 @@ public class LiveGoodsController extends AppBaseController
     private RedisCache redisCache;
     @Autowired
     private IFsStoreProductAttrScrmService attrService;
+
     @Autowired
-    private IFsStoreProductRelationScrmService productRelationService;
+    private FootprintQueueService footprintQueueService;
 
     @Autowired
     private IFsStoreProductAttrValueScrmService attrValueService;
@@ -165,31 +166,10 @@ public class LiveGoodsController extends AppBaseController
         }
 
         
-        // 获取用户的TOKEN写入足迹
+        // 足迹异步写入 Redis 队列
         String userId=getUserId();
         if(userId!=null){
-            FsStoreProductRelationScrm productRelation=new FsStoreProductRelationScrm();
-            productRelation.setIsDel(0);
-            productRelation.setUserId(Long.parseLong(userId));
-            productRelation.setProductId(product.getProductId());
-            productRelation.setType("foot");
-            List<FsStoreProductRelationScrm> productRelations=productRelationService.selectFsStoreProductRelationList(productRelation);
-            if(productRelations!=null&&productRelations.size()>0){
-                FsStoreProductRelationScrm relation=productRelations.get(0);
-                relation.setUpdateTime(new Date());
-                productRelationService.updateFsStoreProductRelation(relation);
-            }
-            else{
-                FsStoreProductRelationScrm relation=new FsStoreProductRelationScrm();
-                relation.setUserId(Long.parseLong(userId));
-                relation.setIsDel(0);
-                relation.setProductId(product.getProductId());
-                relation.setUpdateTime(new Date());
-                relation.setType("foot");
-                relation.setCreateTime(new Date());
-                relation.setUpdateTime(new Date());
-                productRelationService.insertFsStoreProductRelation(relation);
-            }
+            footprintQueueService.enqueue(Long.parseLong(userId), product.getProductId());
         }
 
         // 查询限购信息

+ 48 - 0
fs-user-app/src/main/java/com/fs/app/queue/footprint/FootprintQueueItem.java

@@ -0,0 +1,48 @@
+package com.fs.app.queue.footprint;
+
+import java.io.Serializable;
+
+/**
+ * 商品足迹队列消息
+ */
+public class FootprintQueueItem implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private Long userId;
+    private Long productId;
+    private String taskId;
+
+    public FootprintQueueItem() {
+    }
+
+    public FootprintQueueItem(Long userId, Long productId, String taskId) {
+        this.userId = userId;
+        this.productId = productId;
+        this.taskId = taskId;
+    }
+
+    public Long getUserId() {
+        return userId;
+    }
+
+    public void setUserId(Long userId) {
+        this.userId = userId;
+    }
+
+    public Long getProductId() {
+        return productId;
+    }
+
+    public void setProductId(Long productId) {
+        this.productId = productId;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+}

+ 122 - 0
fs-user-app/src/main/java/com/fs/app/queue/footprint/FootprintQueueService.java

@@ -0,0 +1,122 @@
+package com.fs.app.queue.footprint;
+
+import com.fs.common.constant.LiveKeysConstant;
+import com.fs.common.core.redis.RedisCache;
+import com.fs.hisStore.domain.FsStoreProductRelationScrm;
+import com.fs.hisStore.service.IFsStoreProductRelationScrmService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 商品足迹 Redis 队列:接口入队,单线程慢消费,原子锁保障处理后删除。
+ */
+@Service
+public class FootprintQueueService {
+
+    private static final Logger log = LoggerFactory.getLogger(FootprintQueueService.class);
+
+    private static final long EMPTY_QUEUE_SLEEP_MS = 1000L;
+    private static final long CONSUME_INTERVAL_MS = 200L;
+
+    @Autowired
+    private RedisCache redisCache;
+
+    @Autowired
+    private IFsStoreProductRelationScrmService productRelationService;
+
+    private volatile boolean running = true;
+    private Thread consumerThread;
+
+    /**
+     * 将足迹写入任务放入 Redis 队列
+     */
+    public void enqueue(Long userId, Long productId) {
+        if (userId == null || productId == null) {
+            return;
+        }
+        FootprintQueueItem item = new FootprintQueueItem(userId, productId, UUID.randomUUID().toString());
+        redisCache.setVoice(LiveKeysConstant.FOOTPRINT_QUEUE_KEY, item);
+    }
+
+    @PostConstruct
+    public void startConsumer() {
+        consumerThread = new Thread(this::consumeLoop, "footprint-queue-consumer");
+        consumerThread.setDaemon(true);
+        consumerThread.start();
+        log.info("商品足迹队列消费者线程已启动");
+    }
+
+    @PreDestroy
+    public void stopConsumer() {
+        running = false;
+        if (consumerThread != null) {
+            consumerThread.interrupt();
+        }
+        log.info("商品足迹队列消费者线程已停止");
+    }
+
+    private void consumeLoop() {
+        while (running) {
+            try {
+                FootprintQueueItem item = redisCache.popVoiceKey(LiveKeysConstant.FOOTPRINT_QUEUE_KEY);
+                if (item == null) {
+                    Thread.sleep(EMPTY_QUEUE_SLEEP_MS);
+                    continue;
+                }
+
+                String lockKey = LiveKeysConstant.FOOTPRINT_PROCESSING_KEY + item.getTaskId();
+                redisCache.setCacheObject(lockKey, item,
+                        LiveKeysConstant.FOOTPRINT_PROCESSING_LOCK_EXPIRE, TimeUnit.SECONDS);
+
+                try {
+                    saveFootprint(item.getUserId(), item.getProductId());
+                    redisCache.deleteObject(lockKey);
+                } catch (Exception e) {
+                    log.error("消费商品足迹失败, userId={}, productId={}, taskId={}",
+                            item.getUserId(), item.getProductId(), item.getTaskId(), e);
+                    redisCache.deleteObject(lockKey);
+                    redisCache.setVoice(LiveKeysConstant.FOOTPRINT_QUEUE_KEY, item);
+                }
+
+                Thread.sleep(CONSUME_INTERVAL_MS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                break;
+            } catch (Exception e) {
+                log.error("商品足迹队列消费异常", e);
+            }
+        }
+    }
+
+    private void saveFootprint(Long userId, Long productId) {
+        FsStoreProductRelationScrm query = new FsStoreProductRelationScrm();
+        query.setIsDel(0);
+        query.setUserId(userId);
+        query.setProductId(productId);
+        query.setType("foot");
+        List<FsStoreProductRelationScrm> relations = productRelationService.selectFsStoreProductRelationList(query);
+        if (relations != null && !relations.isEmpty()) {
+            FsStoreProductRelationScrm relation = relations.get(0);
+            relation.setUpdateTime(new Date());
+            productRelationService.updateFsStoreProductRelation(relation);
+        } else {
+            FsStoreProductRelationScrm relation = new FsStoreProductRelationScrm();
+            relation.setUserId(userId);
+            relation.setIsDel(0);
+            relation.setProductId(productId);
+            relation.setType("foot");
+            relation.setCreateTime(new Date());
+            relation.setUpdateTime(new Date());
+            productRelationService.insertFsStoreProductRelation(relation);
+        }
+    }
+}