|
|
@@ -1,136 +0,0 @@
|
|
|
-package com.fs.app.socket;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.fs.qw.vo.QwMessageListVO;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-
|
|
|
-import javax.websocket.Session;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Optional;
|
|
|
-import java.util.concurrent.*;
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-public class QwImSocketBroadcaster {
|
|
|
-
|
|
|
- /** 租户 -> Session 集合 */
|
|
|
- private static final ConcurrentMap<Long, CopyOnWriteArraySet<Session>> companySessions = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- /** 租户 -> 消息队列 */
|
|
|
- private static final ConcurrentMap<Long, BlockingQueue<QwMessageListVO>> messageQueues = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- /** 每个租户一个发送线程 */
|
|
|
- private static final ConcurrentMap<Long, Future<?>> companyWorkers = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- /** 全局线程池(异步发送) */
|
|
|
- private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(
|
|
|
- 5,
|
|
|
- 100,
|
|
|
- 60,
|
|
|
- TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue<>(100),
|
|
|
- r -> {
|
|
|
- Thread t = new Thread(r, "qwIm-websocket-broadcast-worker");
|
|
|
- t.setDaemon(true);
|
|
|
- return t;
|
|
|
- },
|
|
|
- // 使用 AbortPolicy:拋出 RejectedExecutionException
|
|
|
- new ThreadPoolExecutor.AbortPolicy()
|
|
|
- );
|
|
|
-
|
|
|
- // 注册会话
|
|
|
- public static void registerSession(Long companyId, Session session) {
|
|
|
- companySessions.computeIfAbsent(companyId, k -> new CopyOnWriteArraySet<>()).add(session);
|
|
|
- messageQueues.computeIfAbsent(companyId, k -> new LinkedBlockingQueue<>(10000));
|
|
|
- startWorkerIfAbsent(companyId);
|
|
|
- log.info("✅ 新连接注册:companyId={}, sessionId={}", companyId, session.getId());
|
|
|
- }
|
|
|
-
|
|
|
- // 移除会话
|
|
|
- public static void removeSession(Long companyId, Session session) {
|
|
|
- Optional.ofNullable(companySessions.get(companyId)).ifPresent(sessions -> {
|
|
|
- sessions.remove(session);
|
|
|
- log.info("🗑️ 移除Session:companyId={}, sessionId={}", companyId, session.getId());
|
|
|
- if (sessions.isEmpty()) stopWorker(companyId);
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- // 广播消息(异步入队)
|
|
|
- public static void broadcast(QwMessageListVO message) {
|
|
|
- if (message == null) {
|
|
|
- log.warn("广播消息不存在");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- BlockingQueue<QwMessageListVO> queue = messageQueues.get(message.getCompanyId());
|
|
|
- if (queue == null) {
|
|
|
- log.warn("消息队列不存在 msg: {}", message);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- boolean success = queue.offer(message);
|
|
|
- if (!success) {
|
|
|
- log.warn("消息队列已满 companyId: {}", message.getCompanyId());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // 启动对应租户的发送线程
|
|
|
- private static void startWorkerIfAbsent(Long companyId) {
|
|
|
- companyWorkers.computeIfAbsent(companyId, id -> EXECUTOR.submit(() -> {
|
|
|
- BlockingQueue<QwMessageListVO> queue = messageQueues.get(companyId);
|
|
|
- CopyOnWriteArraySet<Session> sessions = companySessions.get(companyId);
|
|
|
- log.info("🚀 启动租户消息分发线程:companyId={}", companyId);
|
|
|
-
|
|
|
- while (!Thread.currentThread().isInterrupted()) {
|
|
|
- try {
|
|
|
- QwMessageListVO msgObj = queue.take();
|
|
|
- String msg = JSON.toJSONString(msgObj);
|
|
|
-
|
|
|
- if (sessions == null || sessions.isEmpty()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- List<Session> closed = new ArrayList<>();
|
|
|
- for (Session s : sessions) {
|
|
|
- if (!s.isOpen()) {
|
|
|
- closed.add(s);
|
|
|
- continue;
|
|
|
- }
|
|
|
- try {
|
|
|
- s.getAsyncRemote().sendText(msg, result -> {
|
|
|
- if (!result.isOK()) {
|
|
|
- Throwable e = result.getException();
|
|
|
- log.error("❌ 异步发送失败:companyId={}, sessionId={}, error={}", companyId, s.getId(), e != null ? e.getMessage() : "未知错误", e);
|
|
|
- closed.add(s);
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (IllegalStateException e) {
|
|
|
- log.warn("⚠️ Session状态异常:sessionId={}, msg={}", s.getId(), e.getMessage());
|
|
|
- closed.add(s);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("广播异常:sessionId={}, error={}", s.getId(), e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- closed.forEach(sessions::remove);
|
|
|
-
|
|
|
- TimeUnit.MILLISECONDS.sleep(1); // 1 毫秒延迟
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("广播线程异常:companyId={}, error={}", companyId, e.getMessage(), e);
|
|
|
- }
|
|
|
- }
|
|
|
- }));
|
|
|
- }
|
|
|
-
|
|
|
- // 停止租户线程
|
|
|
- private static void stopWorker(Long companyId) {
|
|
|
- Future<?> worker = companyWorkers.remove(companyId);
|
|
|
- if (worker != null) {
|
|
|
- worker.cancel(true);
|
|
|
- log.info("🛑 停止租户广播线程:companyId={}", companyId);
|
|
|
- }
|
|
|
- messageQueues.remove(companyId);
|
|
|
- }
|
|
|
-
|
|
|
-}
|