Browse Source

socket防止奔溃 添加队列进行限流

yuhongqi 3 days ago
parent
commit
bade2ea702

+ 11 - 0
fs-admin/src/main/java/com/fs/hisStore/controller/FsStoreProductScrmController.java

@@ -177,6 +177,17 @@ public class FsStoreProductScrmController extends BaseController
     }
 
 
+    @PostMapping(value = "/updateCache")
+    public R updateCache(@RequestParam Long productId)
+    {
+        if (productId == null) {
+            return R.error();
+        }
+
+        return fsStoreProductService.updateCache(productId);
+    }
+
+
 
 
     /**

+ 351 - 24
fs-live-app/src/main/java/com/fs/live/websocket/service/WebSocketServer.java

@@ -36,12 +36,15 @@ import javax.websocket.*;
 import javax.websocket.server.ServerEndpoint;
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -68,6 +71,22 @@ public class WebSocketServer {
     private final static long HEARTBEAT_TIMEOUT = 2 * 60 * 1000;
     // admin房间消息发送线程池(单线程,保证串行化)
     private final static ConcurrentHashMap<Long, ExecutorService> adminExecutors = new ConcurrentHashMap<>();
+    
+    // 消息队列系统
+    // 每个直播间的消息队列,使用优先级队列支持管理员消息插队
+    private final static ConcurrentHashMap<Long, PriorityBlockingQueue<QueueMessage>> messageQueues = new ConcurrentHashMap<>();
+    // 每个直播间的消费者线程
+    private final static ConcurrentHashMap<Long, Thread> consumerThreads = new ConcurrentHashMap<>();
+    // 每个直播间的消费者线程控制标志
+    private final static ConcurrentHashMap<Long, AtomicBoolean> consumerRunningFlags = new ConcurrentHashMap<>();
+    // 每个直播间队列的总大小(字节数)
+    private final static ConcurrentHashMap<Long, AtomicLong> queueSizes = new ConcurrentHashMap<>();
+    // 消息队列最大容量:10000
+    private final static int MAX_QUEUE_SIZE = 10000;
+    // 消息队列最大大小:200MB
+    private final static long MAX_QUEUE_SIZE_BYTES = 200L * 1024L * 1024L; // 200MB
+    // 上下线消息采样率:10%
+    private final static double ENTRY_EXIT_SAMPLE_RATE = 0.1;
 
     private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
     private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
@@ -189,7 +208,8 @@ public class WebSocketServer {
                 redisCache.incr(UNIQUE_VIEWERS_KEY + liveId, 1);
             }
             liveWatchUserVO.setMsgStatus(liveWatchUserVO.getMsgStatus());
-            if (1 == random.nextInt(10)) {
+            // 上线消息采样10%进入队列
+            if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
                 SendMsgVo sendMsgVo = new SendMsgVo();
                 sendMsgVo.setLiveId(liveId);
                 sendMsgVo.setUserId(userId);
@@ -199,8 +219,8 @@ public class WebSocketServer {
                 sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
                 sendMsgVo.setNickName(fsUser.getNickname());
                 sendMsgVo.setAvatar(fsUser.getAvatar());
-                // 广播连接消息
-                broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+                // 将上线消息加入队列
+                enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
             }
 
             // 缓存用户首次进入记录,过期时间4小时
@@ -289,6 +309,15 @@ public class WebSocketServer {
         sessionLocks.putIfAbsent(session.getId(), new ReentrantLock());
         // 初始化心跳时间
         heartbeatCache.put(session.getId(), System.currentTimeMillis());
+        
+        // 如果有session,启动消费者线程
+        ConcurrentHashMap<Long, Session> tempRoom = getRoom(liveId);
+        List<Session> tempAdminRoom = getAdminRoom(liveId);
+        boolean hasSession = (tempRoom != null && !tempRoom.isEmpty()) ||
+                            (tempAdminRoom != null && !tempAdminRoom.isEmpty());
+        if (hasSession) {
+            startConsumerThread(liveId);
+        }
 
     }
 
@@ -341,8 +370,8 @@ public class WebSocketServer {
             LiveWatchUser liveWatchUserVO = liveWatchUserService.close(fsUser,liveId, userId);
 
 
-            // 广播离开消息 添加一个概率问题 摇塞子,1-4 当为1的时候广播消息
-            if (1 == new Random().nextInt(10)) {
+            // 下线消息采样10%进入队列
+            if (random.nextDouble() < ENTRY_EXIT_SAMPLE_RATE) {
                 SendMsgVo sendMsgVo = new SendMsgVo();
                 sendMsgVo.setLiveId(liveId);
                 sendMsgVo.setUserId(userId);
@@ -352,7 +381,8 @@ public class WebSocketServer {
                 sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
                 sendMsgVo.setNickName(fsUser.getNickname());
                 sendMsgVo.setAvatar(fsUser.getAvatar());
-                broadcastWebMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+                // 将下线消息加入队列
+                enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), false);
             }
 
         } else {
@@ -370,6 +400,9 @@ public class WebSocketServer {
         // 清理Session相关资源
         heartbeatCache.remove(session.getId());
         sessionLocks.remove(session.getId());
+        
+        // 检查并清理空的直播间资源
+        cleanupEmptyRoom(liveId);
     }
 
     //收到客户端信息
@@ -379,6 +412,7 @@ public class WebSocketServer {
 
         long liveId = (long) userProperties.get("liveId");
         long userType = (long) userProperties.get("userType");
+        boolean isAdmin = false;
 
         SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
         if(msg.isOn()) return;
@@ -476,8 +510,9 @@ public class WebSocketServer {
                     msg.setOn(true);
                     msg.setData(JSONObject.toJSONString(liveMsg));
 
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    // 将消息加入队列(普通用户消息)
+                    isAdmin = (userType == 1);
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), isAdmin);
                     break;
                 case "sendNormalMsg":
                     msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
@@ -510,8 +545,7 @@ public class WebSocketServer {
                     msg.setOn(true);
                     msg.setData(JSONObject.toJSONString(liveMsg));
                     msg.setCmd("sendMsg");
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
                 case "sendPopMsg":
                     msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
@@ -524,8 +558,7 @@ public class WebSocketServer {
                     liveMsg.setMsg(msg.getMsg());
                     msg.setOn(true);
                     msg.setData(JSONObject.toJSONString(liveMsg));
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
                 case "sendTopMsg":
                     msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
@@ -539,8 +572,7 @@ public class WebSocketServer {
                     liveMsg.setEndTime(DateUtils.addMinutes(new Date(),msg.getDuration()).toString());
                     msg.setOn(true);
                     msg.setData(JSONObject.toJSONString(liveMsg));
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     // 放在当前活动里面
                     redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG));
                     redisCache.setCacheObject(String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG, liveId, TOP_MSG), JSONObject.toJSONString(liveMsg));
@@ -550,13 +582,13 @@ public class WebSocketServer {
                     msg.setOn(true);
                     liveWatchUserService.updateGlobalVisible(liveId, msg.getStatus());
                     liveService.updateGlobalVisible(liveId, msg.getStatus());
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    // 管理员消息插队
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
                 case "singleVisible":
                     liveWatchUserService.updateSingleVisible(liveId, msg.getStatus(),msg.getUserId());
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    // 管理员消息插队
+                    enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
                     break;
                 case "sendGift":
                     break;
@@ -595,7 +627,8 @@ public class WebSocketServer {
         sendMsgVo.setUserType(0L);
         sendMsgVo.setCmd("deleteMsg");
         sendMsgVo.setMsg(msg.getMsg());
-        broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        // 管理员消息插队
+        enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)), true);
     }
 
     private void processCoupon(long liveId, SendMsgVo msg) {
@@ -615,7 +648,8 @@ public class WebSocketServer {
         } else {
             redisCache.deleteObject(String.format(LiveKeysConstant.LIVE_COUPON_NUM , couponIssueId));
         }
-        broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+        // 管理员消息插队
+        enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
     }
 
 
@@ -630,7 +664,8 @@ public class WebSocketServer {
         liveService.asyncToCacheLiveConfig(liveId);
         msg.setLiveId(liveId);
         msg.setData(JSONObject.toJSONString(liveGoods));
-        broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+        // 管理员消息插队
+        enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
     }
 
     /**
@@ -644,7 +679,8 @@ public class WebSocketServer {
         if (Objects.nonNull(liveRedConf)) {
             liveService.asyncToCacheLiveConfig(liveId);
             msg.setData(JSONObject.toJSONString(liveRedConf));
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+            // 管理员消息插队
+            enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
         }
     }
 
@@ -659,7 +695,8 @@ public class WebSocketServer {
         if (Objects.nonNull(liveLotteryConf)) {
             liveService.asyncToCacheLiveConfig(liveId);
             msg.setData(JSONObject.toJSONString(liveLotteryConf));
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+            // 管理员消息插队
+            enqueueMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)), true);
         }
     }
 
@@ -1139,7 +1176,8 @@ public class WebSocketServer {
 //                }
             }
             msg.setStatus(1);
-            broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
+            // 定时任务消息作为管理员消息插队
+            enqueueMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)), true);
         } catch (Exception e) {
             log.error("定时任务执行异常:{}", e.getMessage());
         }
@@ -1525,5 +1563,294 @@ public class WebSocketServer {
         }
     }
 
+    /**
+     * 消息队列包装类,支持优先级(管理员消息优先级更高)
+     */
+    private static class QueueMessage implements Comparable<QueueMessage> {
+        private final String message;
+        private final long timestamp;
+        private final int priority; // 0=普通消息, 1=管理员消息(优先级更高)
+        private final long sequence; // 序列号,用于相同优先级消息的FIFO排序
+        private final long sizeBytes; // 消息大小(字节数)
+
+        private static final AtomicLong sequenceGenerator = new AtomicLong(0);
+
+        public QueueMessage(String message, boolean isAdmin) {
+            this.message = message;
+            this.timestamp = System.currentTimeMillis();
+            this.priority = isAdmin ? 1 : 0;
+            this.sequence = sequenceGenerator.getAndIncrement();
+            // 计算消息大小(UTF-8编码)
+            this.sizeBytes = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public long getSizeBytes() {
+            return sizeBytes;
+        }
+
+        @Override
+        public int compareTo(QueueMessage other) {
+            // 优先级高的先处理(管理员消息)
+            int priorityCompare = Integer.compare(other.priority, this.priority);
+            if (priorityCompare != 0) {
+                return priorityCompare;
+            }
+            // 相同优先级按序列号排序(FIFO)
+            return Long.compare(this.sequence, other.sequence);
+        }
+    }
+
+    /**
+     * 获取或创建消息队列
+     */
+    private PriorityBlockingQueue<QueueMessage> getMessageQueue(Long liveId) {
+        return messageQueues.computeIfAbsent(liveId, k -> new PriorityBlockingQueue<>());
+    }
+
+    /**
+     * 启动消费者线程(如果还没有启动)
+     */
+    private void startConsumerThread(Long liveId) {
+        consumerRunningFlags.computeIfAbsent(liveId, k -> new AtomicBoolean(false));
+        AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
+        
+        // 如果线程已经在运行,直接返回
+        if (runningFlag.get()) {
+            return;
+        }
+
+        // 尝试启动消费者线程
+        synchronized (consumerRunningFlags) {
+            if (runningFlag.compareAndSet(false, true)) {
+                Thread consumerThread = new Thread(() -> {
+                    PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
+                    log.info("[消息队列] 启动消费者线程, liveId={}", liveId);
+                    
+                    while (runningFlag.get()) {
+                        try {
+                            // 检查是否还有session,如果没有则退出
+                            ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
+                            List<Session> adminRoom = adminRooms.get(liveId);
+                            
+                            boolean hasSession = (room != null && !room.isEmpty()) || 
+                                                (adminRoom != null && !adminRoom.isEmpty());
+                            
+                            if (!hasSession) {
+                                log.info("[消息队列] 直播间无session,停止消费者线程, liveId={}", liveId);
+                                break;
+                            }
+
+                            // 从队列中取消息,最多等待1秒
+                            QueueMessage queueMessage = queue.poll(1, TimeUnit.SECONDS);
+                            if (queueMessage != null) {
+                                // 更新队列大小(减少)
+                                AtomicLong currentSize = queueSizes.get(liveId);
+                                if (currentSize != null) {
+                                    currentSize.addAndGet(-queueMessage.getSizeBytes());
+                                }
+                                // 广播消息
+                                broadcastMessageFromQueue(liveId, queueMessage.getMessage());
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            log.info("[消息队列] 消费者线程被中断, liveId={}", liveId);
+                            break;
+                        } catch (Exception e) {
+                            log.error("[消息队列] 消费消息异常, liveId={}", liveId, e);
+                        }
+                    }
+                    
+                    // 清理资源
+                    runningFlag.set(false);
+                    consumerThreads.remove(liveId);
+                    log.info("[消息队列] 消费者线程已停止, liveId={}", liveId);
+                }, "MessageConsumer-" + liveId);
+                
+                consumerThread.setDaemon(true);
+                consumerThread.start();
+                consumerThreads.put(liveId, consumerThread);
+            }
+        }
+    }
+
+    /**
+     * 停止消费者线程
+     */
+    private void stopConsumerThread(Long liveId) {
+        AtomicBoolean runningFlag = consumerRunningFlags.get(liveId);
+        if (runningFlag != null) {
+            runningFlag.set(false);
+        }
+        Thread consumerThread = consumerThreads.remove(liveId);
+        if (consumerThread != null) {
+            consumerThread.interrupt();
+        }
+    }
+
+    /**
+     * 将消息加入队列
+     * @param liveId 直播间ID
+     * @param message 消息内容
+     * @param isAdmin 是否是管理员消息(管理员消息会插队)
+     * @return 是否成功加入队列
+     */
+    private boolean enqueueMessage(Long liveId, String message, boolean isAdmin) {
+        PriorityBlockingQueue<QueueMessage> queue = getMessageQueue(liveId);
+        AtomicLong currentSize = queueSizes.computeIfAbsent(liveId, k -> new AtomicLong(0));
+        
+        // 计算新消息的大小
+        long messageSize = message != null ? message.getBytes(StandardCharsets.UTF_8).length : 0;
+        
+        // 检查队列条数限制
+        if (!isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
+            log.warn("[消息队列] 队列条数已满,丢弃消息, liveId={}, queueSize={}", liveId, queue.size());
+            return false;
+        }
+        
+        // 检查队列大小限制(200MB)
+        long newTotalSize = currentSize.get() + messageSize;
+        if (newTotalSize > MAX_QUEUE_SIZE_BYTES) {
+            if (!isAdmin) {
+                // 普通消息超过大小限制,直接丢弃
+                log.warn("[消息队列] 队列大小超过限制,丢弃普通消息, liveId={}, currentSize={}MB, messageSize={}KB", 
+                        liveId, currentSize.get() / (1024.0 * 1024.0), messageSize / 1024.0);
+                return false;
+            } else {
+                // 管理员消息:需要移除一些普通消息以腾出空间
+                long needToFree = newTotalSize - MAX_QUEUE_SIZE_BYTES;
+                long freedSize = removeMessagesToFreeSpace(queue, currentSize, needToFree, true);
+                if (freedSize < needToFree) {
+                    log.warn("[消息队列] 无法释放足够空间,管理员消息可能无法入队, liveId={}, needToFree={}KB, freed={}KB", 
+                            liveId, needToFree / 1024.0, freedSize / 1024.0);
+                    // 即使空间不足,也尝试入队(可能会超过限制,但管理员消息优先级高)
+                }
+            }
+        }
+        
+        // 如果是管理员消息且队列条数已满,移除一个普通消息
+        if (isAdmin && queue.size() >= MAX_QUEUE_SIZE) {
+            // 由于是优先级队列,普通消息(priority=0)会在队列末尾
+            // 尝试移除一个普通消息,为管理员消息腾出空间
+            QueueMessage removed = null;
+            Iterator<QueueMessage> iterator = queue.iterator();
+            while (iterator.hasNext()) {
+                QueueMessage msg = iterator.next();
+                if (msg.priority == 0) {
+                    removed = msg;
+                    break;
+                }
+            }
+            if (removed != null) {
+                queue.remove(removed);
+                currentSize.addAndGet(-removed.getSizeBytes());
+                log.debug("[消息队列] 管理员消息插队,移除普通消息, liveId={}", liveId);
+            } else {
+                // 如果没有普通消息,移除队列末尾的消息(可能是最早的管理员消息)
+                // 这种情况很少发生,因为管理员消息通常较少
+                log.warn("[消息队列] 队列条数已满且无普通消息可移除, liveId={}", liveId);
+            }
+        }
+        
+        QueueMessage queueMessage = new QueueMessage(message, isAdmin);
+        queue.offer(queueMessage);
+        currentSize.addAndGet(messageSize);
+        
+        // 如果有session,确保消费者线程在运行
+        ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
+        List<Session> adminRoom = adminRooms.get(liveId);
+        boolean hasSession = (room != null && !room.isEmpty()) || 
+                            (adminRoom != null && !adminRoom.isEmpty());
+        
+        if (hasSession) {
+            startConsumerThread(liveId);
+        }
+        
+        return true;
+    }
+
+    /**
+     * 移除消息以释放空间
+     * @param queue 消息队列
+     * @param currentSize 当前队列大小(原子变量)
+     * @param needToFree 需要释放的空间(字节数)
+     * @param onlyRemoveNormal 是否只移除普通消息(true=只移除普通消息,false=可以移除任何消息)
+     * @return 实际释放的空间(字节数)
+     */
+    private long removeMessagesToFreeSpace(PriorityBlockingQueue<QueueMessage> queue, 
+                                          AtomicLong currentSize, 
+                                          long needToFree, 
+                                          boolean onlyRemoveNormal) {
+        long freedSize = 0;
+        List<QueueMessage> toRemove = new ArrayList<>();
+        
+        // 收集需要移除的消息(优先移除普通消息)
+        Iterator<QueueMessage> iterator = queue.iterator();
+        while (iterator.hasNext() && freedSize < needToFree) {
+            QueueMessage msg = iterator.next();
+            if (!onlyRemoveNormal || msg.priority == 0) {
+                toRemove.add(msg);
+                freedSize += msg.getSizeBytes();
+            }
+        }
+        
+        // 如果只移除普通消息但空间还不够,可以移除管理员消息
+        if (onlyRemoveNormal && freedSize < needToFree) {
+            iterator = queue.iterator();
+            while (iterator.hasNext() && freedSize < needToFree) {
+                QueueMessage msg = iterator.next();
+                if (msg.priority == 1 && !toRemove.contains(msg)) {
+                    toRemove.add(msg);
+                    freedSize += msg.getSizeBytes();
+                }
+            }
+        }
+        
+        // 移除消息并更新大小
+        for (QueueMessage msg : toRemove) {
+            if (queue.remove(msg)) {
+                currentSize.addAndGet(-msg.getSizeBytes());
+            }
+        }
+        
+        if (freedSize > 0) {
+            log.info("[消息队列] 释放队列空间, removedCount={}, freedSize={}KB", 
+                    toRemove.size(), freedSize / 1024.0);
+        }
+        
+        return freedSize;
+    }
+
+    /**
+     * 从队列中消费消息并广播
+     */
+    private void broadcastMessageFromQueue(Long liveId, String message) {
+        broadcastMessage(liveId, message);
+    }
+
+    /**
+     * 检查并清理空的直播间资源
+     */
+    private void cleanupEmptyRoom(Long liveId) {
+        ConcurrentHashMap<Long, Session> room = rooms.get(liveId);
+        List<Session> adminRoom = adminRooms.get(liveId);
+        
+        boolean hasSession = (room != null && !room.isEmpty()) || 
+                            (adminRoom != null && !adminRoom.isEmpty());
+        
+        if (!hasSession) {
+            // 停止消费者线程
+            stopConsumerThread(liveId);
+            // 清理消息队列
+            messageQueues.remove(liveId);
+            consumerRunningFlags.remove(liveId);
+            queueSizes.remove(liveId);
+            log.info("[消息队列] 清理空直播间资源, liveId={}", liveId);
+        }
+    }
+
 }
 

+ 2 - 0
fs-service/src/main/java/com/fs/hisStore/service/IFsStoreProductScrmService.java

@@ -146,4 +146,6 @@ public interface IFsStoreProductScrmService
     List<FsStoreProductListVO> liveList(LiveGoods liveId);
 
     R copyStoreProduct(Long productId);
+
+    R updateCache(Long productId);
 }

+ 25 - 0
fs-service/src/main/java/com/fs/hisStore/service/impl/FsStoreProductScrmServiceImpl.java

@@ -759,6 +759,13 @@ public class FsStoreProductScrmServiceImpl implements IFsStoreProductScrmService
         } else {
             addProductAttr(product.getProductId(),param.getItems(),param.getValues());
         }
+        // 数据修改缓存
+        if (product.getProductId() != null) {
+            FsStoreProductScrm cacheProduct = fsStoreProductMapper.selectFsStoreProductById(product.getProductId());
+            if (cacheProduct != null) {
+                redisCacheT.setCacheObject("fs:product:id:" + product.getProductId(), cacheProduct);
+            }
+        }
         return R.ok();
     }
 
@@ -1450,6 +1457,12 @@ public class FsStoreProductScrmServiceImpl implements IFsStoreProductScrmService
     @Transactional
     public void batchAudit(ProductAuditDTO auditDTO) {
         fsStoreProductMapper.batchAudit(auditDTO);
+        List<FsStoreProductScrm> fsStoreProductScrms = fsStoreProductMapper.selectFsStoreProductByProductIds(auditDTO.getProductIds());
+        if(!fsStoreProductScrms.isEmpty()){
+            for (FsStoreProductScrm fsStoreProductScrm : fsStoreProductScrms) {
+                redisCacheT.setCacheObject("fs:product:id:" + fsStoreProductScrm.getProductId(), fsStoreProductScrm);
+            }
+        }
         storeAuditLogUtil.addBatchAuditList(auditDTO.getProductIds(),auditDTO.getReason(),auditDTO.getAttachImage());
     }
 
@@ -1611,4 +1624,16 @@ public class FsStoreProductScrmServiceImpl implements IFsStoreProductScrmService
         return R.ok();
 
     }
+
+    @Override
+    public R updateCache(Long productId) {
+        // 数据修改缓存
+        if (productId != null) {
+            FsStoreProductScrm cacheProduct = fsStoreProductMapper.selectFsStoreProductById(productId);
+            if (cacheProduct != null) {
+                redisCacheT.setCacheObject("fs:product:id:" + productId, cacheProduct);
+            }
+        }
+        return R.ok();
+    }
 }

+ 2 - 2
fs-service/src/main/resources/application-config-druid-bjzm-test.yml

@@ -15,8 +15,8 @@ logging:
 wx:
   miniapp:
     configs:
-      - appid: wx9bb2cd1e3afe714e   #百域佳选
-        secret: 56a2a3314d4b73eea2eac7e8b861b321 #北京卓美
+      - appid: wxd70f99287830cb51   #云联融智
+        secret: 35fca481b59f5924bfb62253b5d0aa18 #北京卓美
         token: cbnd7lJvkripVOpyTFAna6NAWCxCrvC
         aesKey: HlEiBB55eaWUaeBVAQO3cWKWPYv1vOVQSq7nFNICw4E
         msgDataFormat: JSON

+ 2 - 2
fs-service/src/main/resources/application-config-druid-bjzm.yml

@@ -10,8 +10,8 @@ logging:
 wx:
   miniapp:
     configs:
-      - appid: wx9bb2cd1e3afe714e   #百域佳选
-        secret: 56a2a3314d4b73eea2eac7e8b861b321 #北京卓美
+      - appid: wxd70f99287830cb51   #云联融智
+        secret: 35fca481b59f5924bfb62253b5d0aa18 #北京卓美
         token: cbnd7lJvkripVOpyTFAna6NAWCxCrvC
         aesKey: HlEiBB55eaWUaeBVAQO3cWKWPYv1vOVQSq7nFNICw4E
         msgDataFormat: JSON