Procházet zdrojové kódy

叮当国医会话搜索

cgp před 4 dny
rodič
revize
9fcdec83ad

+ 1 - 1
fs-admin/src/main/java/com/fs/his/task/Task.java

@@ -2057,7 +2057,7 @@ public class Task {
         List<QwCompany> companies = qwCompanyService.selectQwCompanyList(new QwCompany()); // 获取所有企业
         for (QwCompany company : companies) {
             try {
-                syncService.syncConversationsForCorp(company.getCorpId());
+                syncService.syncConversationsForCorp(company.getCorpId(),null);
             } catch (Exception e) {
                 log.error("同步企业 {} 会话失败", company.getCorpName());
                 log.error(e.getMessage());

+ 68 - 1
fs-admin/src/main/java/com/fs/qw/controller/CorporateWeChatSpaceController.java

@@ -1,17 +1,30 @@
 package com.fs.qw.controller;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.core.controller.BaseController;
 import com.fs.common.core.domain.AjaxResult;
 import com.fs.common.exception.CustomException;
 import com.fs.qw.dto.SearchMsgRequest;
 import com.fs.qw.service.ICorporateWeChatSpaceService;
+import com.fs.qw.utils.WeChatSpaceUtil;
 import com.fs.qw.vo.QwSessionConfigVo;
 import com.fs.qw.vo.SearchResultVO;
+import com.fs.qwApi.util.WXBizMsgCrypt;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
+import org.w3c.dom.Document;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 
 /**
  * 企业微信专区-统一前端 API 接口
@@ -24,6 +37,9 @@ public class CorporateWeChatSpaceController extends BaseController {
 
     private final ICorporateWeChatSpaceService weChatSpaceService;
 
+    @Autowired
+    private WeChatSpaceUtil weChatSpaceUtil;
+
     // 会话信息列表
     @GetMapping("/conversations")
     public JSONObject getConversations(
@@ -93,7 +109,7 @@ public class CorporateWeChatSpaceController extends BaseController {
     //获取企业微信专区会话配置
     @GetMapping("/getQwSessionConfig/{corpid}")
     public AjaxResult getQwSessionConfig(@PathVariable String corpid) {
-        QwSessionConfigVo qwSessionConfig = weChatSpaceService.getQwSessionConfigByCorpid(corpid);
+        QwSessionConfigVo qwSessionConfig = weChatSpaceUtil.getQwSessionConfigByCorpid(corpid);
         //敏感信息设置为null
         qwSessionConfig.setPrivateKey(null);
         qwSessionConfig.setAgentSecret(null);
@@ -101,4 +117,55 @@ public class CorporateWeChatSpaceController extends BaseController {
         qwSessionConfig.setAbilityIds(null);
         return AjaxResult.success(qwSessionConfig);
     }
+
+    @GetMapping("/api/wecom/callback/{corpid}")
+    public void verifyUrl(@PathVariable String corpid,
+                          @RequestParam("msg_signature") String msgSignature,
+                          @RequestParam("timestamp") String timestamp,
+                          @RequestParam("nonce") String nonce,
+                          @RequestParam("echostr") String echostr,
+                          HttpServletResponse response) throws Exception {
+        WXBizMsgCrypt wxcpt = new WXBizMsgCrypt("aJlu2JEY7KdL9", "qfz5rCYomxISMojyOgpnbnMWKUyL2JNbGfHCAs8qRJN", corpid);
+        String echoStr = wxcpt.VerifyURL(msgSignature, timestamp, nonce, echostr);
+        response.getWriter().write(echoStr);
+        log.info("收到企业微信会话get回调");
+    }
+
+    /**
+     * 企业会话回调
+     * */
+    @PostMapping("/api/wecom/callback/{corpid}")
+    public void receiveNotify(HttpServletRequest request, HttpServletResponse response,@PathVariable String corpid) {
+        try {
+            // 1. 获取请求参数
+            String msgSignature = request.getParameter("msg_signature");
+            String timestamp = request.getParameter("timestamp");
+            String nonce = request.getParameter("nonce");
+
+            // 2. 获取请求体中的加密字符串
+            InputStream inputStream = request.getInputStream();
+            String postData = IOUtils.toString(inputStream, "UTF-8");
+            JSONObject json = JSON.parseObject(postData);
+            String encrypt = json.getString("encrypt");
+
+            // 3. 使用企业微信提供的WXBizMsgCrypt工具类解密
+            WXBizMsgCrypt wxcpt = new WXBizMsgCrypt("aJlu2JEY7KdL9", "qfz5rCYomxISMojyOgpnbnMWKUyL2JNbGfHCAs8qRJN", corpid);
+            String plainText = wxcpt.DecryptMsg(msgSignature, timestamp, nonce, encrypt);
+
+            // 4. 解析明文XML,获取 NotifyId
+            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder builder = factory.newDocumentBuilder();
+            Document document = builder.parse(new ByteArrayInputStream(plainText.getBytes("UTF-8")));
+            String notifyId = document.getElementsByTagName("NotifyId").item(0).getTextContent();
+
+            // 5. 拿到 notifyId 后,立即触发消息拉取任务
+            weChatSpaceService.syncConversationsByNotifyId(notifyId, corpid);
+
+            // 6. 返回成功响应
+            response.getWriter().write("success");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        log.info("收到企业微信会话post回调");
+    }
 }

+ 3 - 13
fs-service/src/main/java/com/fs/qw/service/ICorporateWeChatSpaceService.java

@@ -3,14 +3,12 @@ package com.fs.qw.service;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.exception.CustomException;
 import com.fs.qw.dto.SearchMsgRequest;
-import com.fs.qw.vo.QwSessionConfigVo;
 import com.fs.qw.vo.SearchResultVO;
 
-import java.util.List;
 
 public interface ICorporateWeChatSpaceService {
     /**
-     * 通过专区中转获取会话记录
+     * 获取会话记录
      */
     JSONObject fetchConversations(long limit,long timeout,String cursor, String customerId,String staffUserId,String corpid);
 
@@ -24,16 +22,6 @@ public interface ICorporateWeChatSpaceService {
      */
     JSONObject login(String code,String corpid);
 
-    /**
-     * 获取所有企业微信专区会话配置列表
-     */
-    List<QwSessionConfigVo> getQwSessionConfigList();
-
-    /**
-     * 根据企业ID获取单个配置
-     */
-    QwSessionConfigVo getQwSessionConfigByCorpid(String corpid);
-
     /**
      * 关键词搜索会话消息
      * @param request 搜索参数(已由Controller完成基本校验)
@@ -41,4 +29,6 @@ public interface ICorporateWeChatSpaceService {
      * @throws CustomException 业务异常(如配置缺失)
      */
     SearchResultVO searchMsg(SearchMsgRequest request);
+
+    void syncConversationsByNotifyId(String notifyId, String corpId);
 }

+ 9 - 2
fs-service/src/main/java/com/fs/qw/service/impl/ConversationSyncService.java

@@ -12,6 +12,7 @@ import com.fs.qw.mapper.QwConversationParticipantMapper;
 import com.fs.qw.mapper.QwConversationSyncStateMapper;
 import com.fs.qw.service.ICorporateWeChatSpaceService;
 import com.fs.qw.utils.WeChatSpaceDecryptUtil;
+import com.fs.qw.utils.WeChatSpaceUtil;
 import com.fs.qw.vo.QwSessionConfigVo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -40,6 +41,9 @@ public class ConversationSyncService {
     @Autowired
     private QwProgramInvoker programInvoker;
 
+    @Autowired
+    private WeChatSpaceUtil weChatSpaceUtil;
+
     /**
      * 全量同步指定企业的会话记录(一直拉取直到 has_more=0)
      * 兜底策略:
@@ -49,9 +53,9 @@ public class ConversationSyncService {
      * - 全局超时 60 分钟
      */
     @Transactional(rollbackFor = Exception.class)
-    public void syncConversationsForCorp(String corpId) {
+    public void syncConversationsForCorp(String corpId,String token) {
         // 1. 获取配置并校验必要字段
-        QwSessionConfigVo config = weChatSpaceService.getQwSessionConfigByCorpid(corpId);
+        QwSessionConfigVo config = weChatSpaceUtil.getQwSessionConfigByCorpid(corpId);
         if (config == null) {
             log.error("企业 {} 未配置企微专区信息,跳过同步", corpId);
             return;
@@ -105,6 +109,9 @@ public class ConversationSyncService {
             if (StringUtils.isNotBlank(nextCursor)) {
                 requestData.put("cursor", nextCursor);
             }
+            if (StringUtils.isNotBlank(token)){
+                requestData.put("token", token);
+            }
             requestData.put("limit", 1000);
 
             JSONObject response = programInvoker.callProgram(

+ 55 - 120
fs-service/src/main/java/com/fs/qw/service/impl/ICorporateWeChatSpaceServiceImpl.java

@@ -14,7 +14,6 @@ import com.fs.qw.service.ICorporateWeChatSpaceService;
 import com.fs.qw.utils.WeChatSpaceUtil;
 import com.fs.qw.vo.QwSessionConfigVo;
 import com.fs.qw.vo.SearchResultVO;
-import com.fs.system.service.ISysConfigService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -28,7 +27,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -38,11 +36,9 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceService {
 
-    @Autowired
-    private ISysConfigService sysConfigService;
-
     @Autowired
     private QwConversationMessageMapper messageMapper;
+
     @Autowired
     private QwConversationParticipantMapper participantMapper;
 
@@ -52,13 +48,13 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
     @Autowired
     private QwProgramInvoker qwProgramInvoker;
 
+    @Autowired
+    private ConversationSyncService conversationSyncService;
+
     private final RestTemplate restTemplate = new RestTemplate();
 
     private final ConcurrentHashMap<String, String> consumedCodes = new ConcurrentHashMap<>();
 
-    // 系统配置缓存前缀
-    private final static String CONFIG_KEY = "qw.sessionConfig";
-
     private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
     // =============== 查询数据库保存的会话 ===============
@@ -124,7 +120,7 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
 
     @Override
     public JSONObject getAgentConfigSignature(String url,String corpid) {
-        QwSessionConfigVo qwSessionConfig = getQwSessionConfigByCorpid(corpid);
+        QwSessionConfigVo qwSessionConfig = weChatSpaceUtil.getQwSessionConfigByCorpid(corpid);
         return weChatSpaceUtil.generateAgentConfigSignature(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentid(), url);
     }
 
@@ -141,7 +137,7 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
             result.put("errmsg", "code already used");
             return result;
         }
-        QwSessionConfigVo qwSessionConfig = getQwSessionConfigByCorpid(corpid);
+        QwSessionConfigVo qwSessionConfig = weChatSpaceUtil.getQwSessionConfigByCorpid(corpid);
         try {
             String accessToken = weChatSpaceUtil.getAccessToken(qwSessionConfig.getCorpid());
             String url = "https://qyapi.weixin.qq.com/cgi-bin/user/getuserinfo?access_token="
@@ -168,42 +164,11 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
         return result;
     }
 
-    /**
-     * 获取所有企业微信专区会话配置列表
-     */
-    @Override
-    public List<QwSessionConfigVo> getQwSessionConfigList() {
-        String json = sysConfigService.selectConfigByKey(CONFIG_KEY);
-        if (StringUtils.isBlank(json)) {
-            log.error("未找到企微专区配置, key:{}", CONFIG_KEY);
-            throw new CustomException("未找到企微专区配置");
-        }
-        try {
-            // 直接解析为 List
-            return JSON.parseArray(json, QwSessionConfigVo.class);
-        } catch (Exception e) {
-            log.error("解析企微专区配置失败", e);
-            throw new CustomException("企微专区配置格式错误");
-        }
-    }
-
-    /**
-     * 根据企业ID获取单个配置
-     */
-    @Override
-    public QwSessionConfigVo getQwSessionConfigByCorpid(String corpid) {
-        List<QwSessionConfigVo> all = getQwSessionConfigList();
-        return all.stream()
-                .filter(config -> config.getCorpid().equals(corpid))
-                .findFirst()
-                .orElseThrow(() -> new CustomException("未找到corpid为 " + corpid + " 的配置"));
-    }
-
     // ========== 关键词搜索 ==========
     @Override
     public SearchResultVO searchMsg(SearchMsgRequest request) {
         // 1. 获取企业配置,并找到 search_msg 能力ID
-        QwSessionConfigVo config = getQwSessionConfigByCorpid(request.getCorpId());
+        QwSessionConfigVo config = weChatSpaceUtil.getQwSessionConfigByCorpid(request.getCorpId());
         String abilityId = config.getAbilityIds().stream()
                 .filter(item -> "invokeSearchMsg".equals(item.getKey()))
                 .map(QwSessionConfigVo.AbilityItem::getValue)
@@ -301,6 +266,54 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
         return new SearchResultVO(resultList, hasMore, nextCursor);
     }
 
+
+    @Override
+    public void syncConversationsByNotifyId(String notifyId, String corpId) {
+        try {
+            QwSessionConfigVo config = weChatSpaceUtil.getQwSessionConfigByCorpid(corpId);
+            String abilityId = config.getAbilityIds().stream()
+                    .filter(item -> "notifyId".equals(item.getKey()))
+                    .map(QwSessionConfigVo.AbilityItem::getValue)
+                    .findFirst()
+                    .orElseThrow(() -> new CustomException("未配置 notifyId 能力,请在企微后台添加"));
+
+            // 构造请求数据,将notify_id传给专区程序
+            JSONObject requestData = new JSONObject();
+            requestData.put("notify_id", notifyId);
+
+            // 调用 sync_call_program
+            JSONObject response = qwProgramInvoker.callProgram(corpId, config.getProgramId(), abilityId, requestData);
+
+            if (response != null && response.getIntValue("errcode") == 0) {
+                String responseDataStr = response.getString("response_data");
+                JSONObject responseData = JSON.parseObject(responseDataStr);
+                String token = responseData.getString("token");
+                String cursor = responseData.getString("cursor");
+
+                // 立刻使用获取到的 token 进行消息同步,token 有效期只有10分钟
+                if (StringUtils.isNotBlank(token)) {
+                    syncConversationsWithToken(corpId, token, cursor);
+                }
+            }
+        } catch (Exception e) {
+            log.error("Failed to sync by notify_id: {}", notifyId, e);
+        }
+    }
+
+    public void syncConversationsWithToken(String corpId, String token, String cursor) {
+        // 获取配置等逻辑...
+        JSONObject requestData = new JSONObject();
+        if (StringUtils.isNotBlank(cursor)) {
+            requestData.put("cursor", cursor);
+        }
+        requestData.put("token", token);
+        requestData.put("limit", 1000);
+
+        // 调用 qwProgramInvoker.callProgram,传入 "invoke_sync_msg" 能力ID
+        // ... 处理返回的消息列表并入库
+        conversationSyncService.syncConversationsForCorp(corpId, token);
+    }
+
     /**
      * 构建前端需要的消息JSON
      */
@@ -339,82 +352,4 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
         }
         return item;
     }
-
-
-//    private JSONArray processMessages(JSONArray msgList, String customerId, String staffUserId, QwSessionConfigVo qwConfig) {
-//        return msgList.parallelStream()
-//                .map(obj -> (JSONObject) obj)
-//                .filter(msg -> isMessageRelatedToUsers(msg, customerId, staffUserId))
-//                .map(msg -> decryptAndFormatMessage(msg, qwConfig))
-//                .filter(Objects::nonNull)
-//                .collect(Collectors.toCollection(JSONArray::new));
-//    }
-//
-//    private boolean isMessageRelatedToUsers(JSONObject msg, String customerId, String staffUserId) {
-//        // 如果都为空,则不过滤,返回全部
-//        if ((customerId == null || customerId.isEmpty()) && (staffUserId == null || staffUserId.isEmpty())) {
-//            return true;
-//        }
-//        boolean hasCustomer = (customerId == null || customerId.isEmpty());
-//        boolean hasStaff = (staffUserId == null || staffUserId.isEmpty());
-//
-//        JSONObject sender = msg.getJSONObject("sender");
-//        JSONArray receivers = msg.getJSONArray("receiver_list");
-//
-//        // 检查发送者
-//        if (sender != null) {
-//            String senderId = sender.getString("id");
-//            int senderType = sender.getIntValue("type");
-//            if (!hasCustomer && senderType == 2 && customerId.equals(senderId)) hasCustomer = true;
-//            if (!hasStaff && senderType == 1 && staffUserId.equals(senderId)) hasStaff = true;
-//        }
-//        if (hasCustomer && hasStaff) return true;
-//
-//        // 检查接收者
-//        if (receivers != null) {
-//            for (int i = 0; i < receivers.size(); i++) {
-//                JSONObject recv = receivers.getJSONObject(i);
-//                String recvId = recv.getString("id");
-//                int recvType = recv.getIntValue("type");
-//                if (!hasCustomer && recvType == 2 && customerId.equals(recvId)) hasCustomer = true;
-//                if (!hasStaff && recvType == 1 && staffUserId.equals(recvId)) hasStaff = true;
-//                if (hasCustomer && hasStaff) return true;
-//            }
-//        }
-//        return hasCustomer && hasStaff;
-//    }
-//
-//    private JSONObject decryptAndFormatMessage(JSONObject msg, QwSessionConfigVo qwConfig) {
-//        JSONObject result = new JSONObject();
-//        try {
-//            JSONObject encryptInfo = msg.getJSONObject("service_encrypt_info");
-//            if (encryptInfo == null) return null;
-//            String encryptedKey = encryptInfo.getString("encrypted_secret_key");
-//            if (encryptedKey == null) return null;
-//
-//            // 解密得到 secretKey
-//            String secretKey = WeChatSpaceDecryptUtil.decryptSecretKey(encryptedKey, qwConfig.getPrivateKey());
-//
-//            // 复制需要返回的字段
-//            result.put("msgid", msg.getString("msgid"));
-//            result.put("secretKey", secretKey);
-//            result.put("sender", msg.get("sender"));
-//            result.put("receiver_list", msg.get("receiver_list"));
-//            result.put("msgtype", msg.getInteger("msgtype"));
-//
-//            Long sendTime = msg.getLong("send_time");
-//            if (sendTime != null) {
-//                String formattedTime = Instant.ofEpochSecond(sendTime)
-//                        .atZone(ZoneId.systemDefault())
-//                        .toLocalDateTime()
-//                        .format(DATE_TIME_FORMATTER);
-//                result.put("send_time_str", formattedTime);
-//                result.put("send_time", sendTime);
-//            }
-//            return result;
-//        } catch (Exception e) {
-//            log.error("解密消息失败, msgid: {}", msg.getString("msgid"), e);
-//            return null;
-//        }
-//    }
 }

+ 42 - 5
fs-service/src/main/java/com/fs/qw/utils/WeChatSpaceUtil.java

@@ -1,15 +1,19 @@
 package com.fs.qw.utils;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.fs.qw.domain.QwCompany;
-import com.fs.qw.service.IQwCompanyService;
+import com.fs.common.exception.CustomException;
+import com.fs.qw.vo.QwSessionConfigVo;
 import com.fs.qwApi.service.QwApiService;
+import com.fs.system.service.ISysConfigService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -19,6 +23,9 @@ public class WeChatSpaceUtil {
 
     private static final RestTemplate restTemplate = new RestTemplate();
 
+    // 系统配置缓存前缀
+    private final static String CONFIG_KEY = "qw.sessionConfig";
+
     // 按企业ID缓存 agent_ticket(key = corpId + "_" + agentId,确保不同应用隔离)
     private final ConcurrentMap<String, String> AGENT_TICKET_CACHE = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, Long> AGENT_TICKET_EXPIRE = new ConcurrentHashMap<>();
@@ -27,14 +34,14 @@ public class WeChatSpaceUtil {
     private QwApiService qwApiService;
 
     @Autowired
-    private IQwCompanyService qwCompanyService;
+    private ISysConfigService sysConfigService;
 
     /**
      * 获取企业 access_token(带缓存,由 QwApiService 实现)
      */
     public String getAccessToken(String corpId) {
-        QwCompany qwCompany = qwCompanyService.selectQwCompanyByCorpId(corpId);
-        String openSecret = qwCompany.getOpenSecret();
+        QwSessionConfigVo config = getQwSessionConfigByCorpid(corpId);
+        String openSecret = config.getAgentSecret();
         return qwApiService.getToken(corpId, openSecret);
     }
 
@@ -85,4 +92,34 @@ public class WeChatSpaceUtil {
             throw new RuntimeException("生成 agentConfig 签名失败", e);
         }
     }
+
+    /**
+     * 根据企业ID获取单个配置
+     */
+    public QwSessionConfigVo getQwSessionConfigByCorpid(String corpid) {
+        List<QwSessionConfigVo> all = getQwSessionConfigList();
+        return all.stream()
+                .filter(config -> config.getCorpid().equals(corpid))
+                .findFirst()
+                .orElseThrow(() -> new CustomException("未找到corpid为 " + corpid + " 的配置"));
+    }
+
+    /**
+     * 获取所有企业微信专区会话配置列表
+     */
+    public List<QwSessionConfigVo> getQwSessionConfigList() {
+        String json = sysConfigService.selectConfigByKey(CONFIG_KEY);
+        if (StringUtils.isBlank(json)) {
+            log.error("未找到企微专区配置, key:{}", CONFIG_KEY);
+            throw new CustomException("未找到企微专区配置");
+        }
+        try {
+            // 直接解析为 List
+            return JSON.parseArray(json, QwSessionConfigVo.class);
+        } catch (Exception e) {
+            log.error("解析企微专区配置失败", e);
+            throw new CustomException("企微专区配置格式错误");
+        }
+    }
+
 }