Prechádzať zdrojové kódy

企微聊天-回调消息推送调整

Long 3 týždňov pred
rodič
commit
beae2322d4

+ 4 - 6
fs-company/src/main/java/com/fs/company/controller/qw/QwMsgController.java

@@ -163,14 +163,12 @@ public class QwMsgController extends BaseController
     @GetMapping("/conversationList/{userId}")
     @ApiOperation("获取会话")
     public R conversations(@PathVariable("userId") Long qwUserId,
-                           @RequestParam(required = false) Boolean isBlack,
-                           @RequestParam(required = false) Boolean isRepeat,
-                           @RequestParam(required = false) Boolean isPend){
+                           @RequestParam(required = false) Boolean removeBlack,
+                           @RequestParam(required = false) Boolean removeRepeat){
         Map<String, Object> params = new HashMap<>();
         params.put("qwUserId", qwUserId.toString());
-        params.put("isBlack", isBlack);
-        params.put("isRepeat", isRepeat);
-        params.put("isPend", isPend);
+        params.put("removeBlack", removeBlack);
+        params.put("removeRepeat", removeRepeat);
 
         startPage();
         List<QwContactListVO> list = qwMsgService.selectQwConversationByMap(params);

+ 6 - 5
fs-qw-api-msg/src/main/java/com/fs/app/controller/QwMsgController.java

@@ -3,6 +3,7 @@ package com.fs.app.controller;
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSON;
 import com.fs.app.socket.QwImSocket;
+import com.fs.app.socket.QwImSocketBroadcaster;
 import com.fs.app.util.AudioUtils;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
@@ -619,7 +620,7 @@ public class QwMsgController {
     private void processTextMessage(Long qwUserId, Long senderVid, Long receiverVid, Long serverId, String content, WxWorkMsgResp wxWorkMsgResp, boolean isRoom, String chatId, String chatAvatar) {
         // 保存聊天消息
         QwMessageListVO message = aiHookService.saveQwMsg(qwUserId, senderVid, receiverVid, serverId, content, wxWorkMsgResp.getUuid(), wxWorkMsgResp.getJson(), 1, isRoom, chatId, chatAvatar);
-        QwImSocket.broadcast(message);
+        QwImSocketBroadcaster.broadcast(message);
     }
 
     /**
@@ -668,7 +669,7 @@ public class QwMsgController {
 
         // 保存聊天消息
         QwMessageListVO message = aiHookService.saveQwMsg(qwUserId, senderVid, receiverVid, serverId, json.toString(), wxWorkMsgResp.getUuid(), wxWorkMsgResp.getJson(), 4, isRoom, chatId, chatAvatar);
-        QwImSocket.broadcast(message);
+        QwImSocketBroadcaster.broadcast(message);
     }
 
     /**
@@ -695,7 +696,7 @@ public class QwMsgController {
         String content = fileUrlResp.getData();
         // 保存聊天消息
         QwMessageListVO message = aiHookService.saveQwMsg(qwUserId, senderVid, receiverVid, serverId, content, wxWorkMsgResp.getUuid(), wxWorkMsgResp.getJson(), 2, isRoom, chatId, chatAvatar);
-        QwImSocket.broadcast(message);
+        QwImSocketBroadcaster.broadcast(message);
     }
 
     /**
@@ -714,7 +715,7 @@ public class QwMsgController {
         String content = wxWorkMessageDTO.getUrl();
         // 保存聊天消息
         QwMessageListVO message = aiHookService.saveQwMsg(qwUserId, senderVid, receiverVid, serverId, content, wxWorkMsgResp.getUuid(), wxWorkMsgResp.getJson(), 3, isRoom, chatId, chatAvatar);
-        QwImSocket.broadcast(message);
+        QwImSocketBroadcaster.broadcast(message);
     }
 
     /**
@@ -749,7 +750,7 @@ public class QwMsgController {
 
         // 保存聊天消息
         QwMessageListVO message = aiHookService.saveQwMsg(qwUserId, senderVid, receiverVid, serverId, json.toString(), wxWorkMsgResp.getUuid(), wxWorkMsgResp.getJson(), 5, isRoom, chatId, chatAvatar);
-        QwImSocket.broadcast(message);
+        QwImSocketBroadcaster.broadcast(message);
     }
 
     /**

+ 6 - 54
fs-qw-api-msg/src/main/java/com/fs/app/socket/QwImSocket.java

@@ -1,8 +1,7 @@
 package com.fs.app.socket;
 
-import com.alibaba.fastjson.JSON;
 import com.fs.app.socket.configurator.QwImConfigurator;
-import com.fs.qw.vo.QwMessageListVO;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.OnClose;
@@ -11,17 +10,12 @@ import javax.websocket.OnOpen;
 import javax.websocket.Session;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 
+@Slf4j
 @ServerEndpoint(value = "/qwImSocket/{companyId}", configurator = QwImConfigurator.class)
 @Component
 public class QwImSocket {
 
-    private static final ConcurrentHashMap<Long, CopyOnWriteArraySet<Session>> companySessions = new ConcurrentHashMap<>();
-
     /**
      * 连接建立成功调用的方法
      * @param session   连接会话
@@ -30,7 +24,7 @@ public class QwImSocket {
     @OnOpen
     public void onOpen(Session session, @PathParam("companyId") Long companyId) {
         // 将当前会话加入到会话池中
-        companySessions.computeIfAbsent(companyId, k -> new CopyOnWriteArraySet<>()).add(session);
+        QwImSocketBroadcaster.registerSession(companyId, session);
     }
 
     /**
@@ -41,14 +35,7 @@ public class QwImSocket {
     @OnClose
     public void onClose(Session session, @PathParam("companyId") Long companyId) {
         // 从会话池中移除当前会话
-        CopyOnWriteArraySet<Session> sessions = companySessions.get(companyId);
-        if (sessions != null) {
-            sessions.remove(session);
-            // 如果直播间没人了,可以移除该直播间
-            if (sessions.isEmpty()) {
-                companySessions.remove(companyId);
-            }
-        }
+        QwImSocketBroadcaster.removeSession(companyId, session);
     }
 
     /**
@@ -59,43 +46,8 @@ public class QwImSocket {
      */
     @OnError
     public void onError(Session session, @PathParam("companyId") Long companyId, Throwable error) {
-        System.err.println("发生错误!会话ID: " + session.getId());
-        CopyOnWriteArraySet<Session> sessions = companySessions.get(companyId);
-        if (sessions != null) {
-            sessions.remove(session);
-            // 如果直播间没人了,可以移除该直播间
-            if (sessions.isEmpty()) {
-                companySessions.remove(companyId);
-            }
-        }
-    }
-
-    /**
-     * 群发消息
-     * @param message   要发送的消息
-     */
-    public static void broadcast(QwMessageListVO message) {
-        if (Objects.isNull(message)) {
-            return;
-        }
-
-        String msg = JSON.toJSONString(message);
-        CopyOnWriteArraySet<Session> sessions = companySessions.get(message.getCompanyId());
-        if (sessions != null) {
-            for (Session session : sessions) {
-                if (session.isOpen()) {
-                    try {
-                        session.getBasicRemote().sendText(msg);
-                    } catch (IOException e) {
-                        System.err.println("发送消息给会话[" + session.getId() + "]失败: " + e.getMessage());
-                        // 移除无效会话
-                        sessions.remove(session);
-                    }
-                } else {
-                    sessions.remove(session); // 移除已关闭的会话
-                }
-            }
-        }
+        log.error("发生错误!会话ID: {}", session.getId());
+        QwImSocketBroadcaster.removeSession(companyId, session);
     }
 
 }

+ 136 - 0
fs-qw-api-msg/src/main/java/com/fs/app/socket/QwImSocketBroadcaster.java

@@ -0,0 +1,136 @@
+package com.fs.app.socket;
+
+import com.alibaba.fastjson.JSON;
+import com.fs.qw.vo.QwMessageListVO;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.websocket.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.*;
+
+@Slf4j
+public class QwImSocketBroadcaster {
+
+    /** 租户 -> Session 集合 */
+    private static final ConcurrentMap<Long, CopyOnWriteArraySet<Session>> companySessions = new ConcurrentHashMap<>();
+
+    /** 租户 -> 消息队列 */
+    private static final ConcurrentMap<Long, BlockingQueue<QwMessageListVO>> messageQueues = new ConcurrentHashMap<>();
+
+    /** 每个租户一个发送线程 */
+    private static final ConcurrentMap<Long, Future<?>> companyWorkers = new ConcurrentHashMap<>();
+
+    /** 全局线程池(异步发送) */
+    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
+            5,
+            100,
+            60,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(100),
+            r -> {
+                Thread t = new Thread(r, "qwIm-websocket-broadcast-worker");
+                t.setDaemon(true);
+                return t;
+            },
+            // 使用 AbortPolicy:拋出 RejectedExecutionException
+            new ThreadPoolExecutor.AbortPolicy()
+    );
+
+    // 注册会话
+    public static void registerSession(Long companyId, Session session) {
+        companySessions.computeIfAbsent(companyId, k -> new CopyOnWriteArraySet<>()).add(session);
+        messageQueues.computeIfAbsent(companyId, k -> new LinkedBlockingQueue<>(10000));
+        startWorkerIfAbsent(companyId);
+        log.info("✅ 新连接注册:companyId={}, sessionId={}", companyId, session.getId());
+    }
+
+    // 移除会话
+    public static void removeSession(Long companyId, Session session) {
+        Optional.ofNullable(companySessions.get(companyId)).ifPresent(sessions -> {
+            sessions.remove(session);
+            log.info("🗑️ 移除Session:companyId={}, sessionId={}", companyId, session.getId());
+            if (sessions.isEmpty()) stopWorker(companyId);
+        });
+    }
+
+    // 广播消息(异步入队)
+    public static void broadcast(QwMessageListVO message) {
+        if (message == null) {
+            log.warn("广播消息不存在");
+            return;
+        }
+
+        BlockingQueue<QwMessageListVO> queue = messageQueues.get(message.getCompanyId());
+        if (queue == null) {
+            log.warn("消息队列不存在 msg: {}", message);
+            return;
+        }
+
+        boolean success = queue.offer(message);
+        if (!success) {
+            log.warn("消息队列已满 companyId: {}", message.getCompanyId());
+        }
+    }
+
+    // 启动对应租户的发送线程
+    private static void startWorkerIfAbsent(Long companyId) {
+        companyWorkers.computeIfAbsent(companyId, id -> EXECUTOR.submit(() -> {
+            BlockingQueue<QwMessageListVO> queue = messageQueues.get(companyId);
+            CopyOnWriteArraySet<Session> sessions = companySessions.get(companyId);
+            log.info("🚀 启动租户消息分发线程:companyId={}", companyId);
+
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    QwMessageListVO msgObj = queue.take();
+                    String msg = JSON.toJSONString(msgObj);
+
+                    if (sessions == null || sessions.isEmpty()) {
+                        continue;
+                    }
+
+                    List<Session> closed = new ArrayList<>();
+                    for (Session s : sessions) {
+                        if (!s.isOpen()) {
+                            closed.add(s);
+                            continue;
+                        }
+                        try {
+                            s.getAsyncRemote().sendText(msg, result -> {
+                                if (!result.isOK()) {
+                                    Throwable e = result.getException();
+                                    log.error("❌ 异步发送失败:companyId={}, sessionId={}, error={}", companyId, s.getId(), e != null ? e.getMessage() : "未知错误", e);
+                                    closed.add(s);
+                                }
+                            });
+                        } catch (IllegalStateException e) {
+                            log.warn("⚠️ Session状态异常:sessionId={}, msg={}", s.getId(), e.getMessage());
+                            closed.add(s);
+                        } catch (Exception e) {
+                            log.error("广播异常:sessionId={}, error={}", s.getId(), e.getMessage(), e);
+                        }
+                    }
+                    closed.forEach(sessions::remove);
+
+                    TimeUnit.MILLISECONDS.sleep(1); // 1 毫秒延迟
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    log.error("广播线程异常:companyId={}, error={}", companyId, e.getMessage(), e);
+                }
+            }
+        }));
+    }
+
+    // 停止租户线程
+    private static void stopWorker(Long companyId) {
+        Future<?> worker = companyWorkers.remove(companyId);
+        if (worker != null) {
+            worker.cancel(true);
+            log.info("🛑 停止租户广播线程:companyId={}", companyId);
+        }
+        messageQueues.remove(companyId);
+    }
+
+}

+ 5 - 5
fs-service/src/main/java/com/fs/fastGpt/service/impl/AiHookServiceImpl.java

@@ -71,6 +71,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.lang.reflect.Field;
@@ -2094,7 +2095,6 @@ public class AiHookServiceImpl implements AiHookService {
      * @param chatId        会话ID(群聊才有)
      * @param chatAvatar    群头像(群聊才有)
      */
-    @Transactional(rollbackFor = Exception.class)
     @Override
     public QwMessageListVO saveQwMsg(Long qwUserId, Long senderVid, Long receiverVid, Long serverId, String content, String uuid, String json, int msgType, boolean isRoom, String chatId, String chatAvatar) {
         // 查询企微用户
@@ -2132,7 +2132,7 @@ public class AiHookServiceImpl implements AiHookService {
                 extWxId = receiverVid;
             }
             if (extUser != null) {
-                qwSession = getQwSession(qwUserId, extUser, qwUser, extWxId);
+                qwSession = getQwSession(extUser, qwUser, extWxId);
             }
         }
 
@@ -2190,14 +2190,14 @@ public class AiHookServiceImpl implements AiHookService {
     /**
      * 获取单聊session
      */
-    private QwSession getQwSession(Long qwUserId, QwImUserDTO extUser, QwUser qwUser, Long extWxId) {
+    private QwSession getQwSession(QwImUserDTO extUser, QwUser qwUser, Long extWxId) {
         QwSession qwSession;
         String chatId;
         qwSession = qwSessionMapper.selectQwSessionByExtIdAndQwUserId(extUser.getUserId(), qwUser.getId());
         String firstLetter = PinYinUtil.getFirstLetter(extUser.getUserName());
         if (qwSession == null) {
 
-            RLock lock = redissonClient.getLock("addSession" + extUser.getUserId() + qwUserId);
+            RLock lock = redissonClient.getLock("addSession:" + extUser.getUserId() + ":" + qwUser.getId());
             try {
                 boolean tryLock = lock.tryLock(2, 3, TimeUnit.SECONDS);
                 if (!tryLock) {
@@ -2254,7 +2254,7 @@ public class AiHookServiceImpl implements AiHookService {
         qwSession = qwSessionMapper.selectQwSessionByChatIdAndQwUserId(chatId, qwUser.getId());
         String firstLetter = PinYinUtil.getFirstLetter(qwGroupChat.getName());
         if (qwSession == null) {
-            RLock lock = redissonClient.getLock("addSession" + qwGroupChat.getChatId() + qwUser.getId());
+            RLock lock = redissonClient.getLock("addSession:" + qwGroupChat.getChatId() + ":" + qwUser.getId());
             try {
                 boolean tryLock = lock.tryLock(2, 3, TimeUnit.SECONDS);
                 if (!tryLock) {

+ 2 - 2
fs-service/src/main/java/com/fs/qw/service/impl/QwMsgServiceImpl.java

@@ -627,7 +627,7 @@ public class QwMsgServiceImpl extends ServiceImpl<QwMsgMapper, QwMsg> implements
                     throw new ServiceException("群聊不存在");
                 }
 
-                RLock lock = redissonClient.getLock("addSession" + id + qwUserId);
+                RLock lock = redissonClient.getLock("addSession:" + id + ":" + qwUserId);
                 try {
                     boolean tryLock = lock.tryLock(2, 3, TimeUnit.SECONDS);
                     if (!tryLock) {
@@ -665,7 +665,7 @@ public class QwMsgServiceImpl extends ServiceImpl<QwMsgMapper, QwMsg> implements
                     throw new ServiceException("外部联系人不存在");
                 }
 
-                RLock lock = redissonClient.getLock("addSession" + id + qwUserId);
+                RLock lock = redissonClient.getLock("addSession:" + id + ":" + qwUserId);
                 try {
                     boolean tryLock = lock.tryLock(2, 3, TimeUnit.SECONDS);
                     if (!tryLock) {

+ 6 - 10
fs-service/src/main/resources/mapper/qw/QwSessionMapper.xml

@@ -60,7 +60,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             ec.comment_status = 1                                   as isBlack,
             s.is_room                                               as isGroup,
             (u.qw_repeat = 1 OR u.user_repeat = 1)                  as isRepeat,
-            qwk.status = 0                                          as isPend,
+            false                                                   as isPend,
             qm.msg_id                                               as msgId,
             case qm.msg_type
                 when 1 then 'text'
@@ -76,22 +76,18 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         from qw_session s
         left join qw_external_contact ec on s.qw_ext_id = ec.id
         left join fs_user u on ec.fs_user_id = u.user_id
-        left join qw_work_task qwk on qwk.ext_id = ec.id
         left join (
             select session_id, max(msg_id) as max_msg_id from qw_msg group by session_id
         ) latest_msg on latest_msg.session_id = s.session_id
         left join qw_msg qm on qm.msg_id = latest_msg.max_msg_id
         where s.qw_user_id = #{params.qwUserId}
-        <if test="params.isBlack != null and params.isBlack">
-            and ec.comment_status = 1
+        <if test="params.removeBlack != null and params.removeBlack">
+            and ec.comment_status = 0
         </if>
-        <if test="params.isRepeat != null and params.isRepeat">
-            and (u.qw_repeat = 1 OR u.user_repeat = 1)
+        <if test="params.removeRepeat != null and params.removeRepeat">
+            and u.qw_repeat != 1 and u.user_repeat != 1
         </if>
-        <if test="params.isPend != null and params.isPend">
-            and qwk.status = 0
-        </if>
-        order by qwk.status is null, qwk.status, s.update_time desc
+        order by s.update_time desc
     </select>
 
     <insert id="insertQwSession" parameterType="QwSession" useGeneratedKeys="true" keyProperty="sessionId">