Browse Source

企业微信数据智能专区-会话记录持久化

cgp 1 week ago
parent
commit
41e425a1af

+ 17 - 0
fs-admin/src/main/java/com/fs/his/task/Task.java

@@ -68,6 +68,7 @@ import com.fs.qw.domain.QwUser;
 import com.fs.qw.mapper.QwRestrictionPushRecordMapper;
 import com.fs.qw.mapper.QwUserMapper;
 import com.fs.qw.service.*;
+import com.fs.qw.service.impl.ConversationSyncService;
 import com.fs.qwApi.service.QwApiService;
 import com.fs.sop.domain.QwSopTempVoice;
 import com.fs.sop.service.IQwSopTempVoiceService;
@@ -243,6 +244,9 @@ public class Task {
     @Autowired
     private FsStoreOrderScrmMapper fsStoreOrderScrmMapper;
 
+    @Autowired
+    private ConversationSyncService syncService;
+
     /**
      * 定时任务,处理ai禁止回复之后的消息
      */
@@ -2001,5 +2005,18 @@ public class Task {
         allFutures.join(); // 等待所有任务完成
     }
 
+    // 同步企业会话记录
+    public void syncAllCorps() {
+        List<QwCompany> companies = qwCompanyService.selectQwCompanyList(new QwCompany()); // 获取所有企业
+        for (QwCompany company : companies) {
+            try {
+                syncService.syncConversationsForCorp(company.getCorpId());
+            } catch (Exception e) {
+                log.error("同步企业 {} 会话失败", company.getCorpName());
+                log.error(e.getMessage());
+            }
+        }
+    }
+
 
 }

+ 9 - 8
fs-admin/src/main/java/com/fs/qw/controller/CorporateWeChatSpaceController.java

@@ -26,32 +26,33 @@ public class CorporateWeChatSpaceController extends BaseController {
             @RequestParam(defaultValue = "30") long timeout,
             @RequestParam(required = false) String cursor,
             @RequestParam(required = false) String customerId,
-            @RequestParam(required = false) String staffUserId) {
+            @RequestParam(required = false) String staffUserId,
+            @RequestParam(required = false) String corpid) {
         if (customerId == null|| customerId.isEmpty()) {
             throw new CustomException("客户id不能为空");
         }else if (staffUserId == null|| staffUserId.isEmpty()) {
             throw new CustomException("员工id不能为空");
         }
-        return weChatSpaceService.fetchConversations(limit, timeout, cursor, customerId,staffUserId);
+        return weChatSpaceService.fetchConversations(limit, timeout, cursor, customerId,staffUserId,corpid);
     }
 
 
     // agentConfig 签名
     @GetMapping("/getAgentConfigSignature")
-    public JSONObject getAgentConfigSignature(@RequestParam("url") String url) {
-        return weChatSpaceService.getAgentConfigSignature(url);
+    public JSONObject getAgentConfigSignature(@RequestParam("url") String url, @RequestParam("corpid") String corpid) {
+        return weChatSpaceService.getAgentConfigSignature(url,corpid);
     }
 
     // Web 登录
     @PostMapping("/login")
     public JSONObject login(@RequestBody JSONObject param) {
-        return weChatSpaceService.login(param.getString("code"));
+        return weChatSpaceService.login(param.getString("code"), param.getString("corpid"));
     }
 
     //获取企业微信专区会话配置
-    @GetMapping("/getQwSessionConfig")
-    public AjaxResult getQwSessionConfig() {
-        QwSessionConfigVo qwSessionConfig = weChatSpaceService.getQwSessionConfig();
+    @GetMapping("/getQwSessionConfig/{corpid}")
+    public AjaxResult getQwSessionConfig(@PathVariable String corpid) {
+        QwSessionConfigVo qwSessionConfig = weChatSpaceService.getQwSessionConfigByCorpid(corpid);
         //敏感信息设置为null
         qwSessionConfig.setPrivateKey(null);
         qwSessionConfig.setAgentSecret(null);

+ 57 - 0
fs-service/src/main/java/com/fs/qw/domain/QwConversationMessage.java

@@ -0,0 +1,57 @@
+package com.fs.qw.domain;
+
+import com.baomidou.mybatisplus.annotation.*;
+import lombok.Data;
+import java.util.Date;
+
+/**
+ * 企微会话消息主表
+ */
+@Data
+@TableName("qw_conversation_message")
+public class QwConversationMessage {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    /** 企业ID(corpid) */
+    private String corpId;
+
+    /** 消息唯一ID */
+    private String msgid;
+
+    /** 发送者类型:1-员工,2-外部联系人,3-机器人 */
+    private Integer senderType;
+
+    /** 发送者ID(userid或external_userid) */
+    private String senderId;
+
+    /** 群ID(单聊时可能为空) */
+    private String chatid;
+
+    /** 消息类型(见官方枚举) */
+    private Integer msgtype;
+
+    /** Unix时间戳(秒) */
+    private Long sendTime;
+
+    /** 企业微信返回的加密密钥(Base64) */
+    private String encryptedSecretKey;
+
+    /** 解密后的AES密钥(供前端组件使用) */
+    private String secretKey;
+
+    /** 公钥版本号 */
+    private Integer publicKeyVer;
+
+    /** 扩展信息(JSON字符串) */
+    private String extraInfo;
+
+    /** 记录入库时间 */
+    @TableField(fill = FieldFill.INSERT)
+    private Date createTime;
+
+    /** 更新时间 */
+    @TableField(fill = FieldFill.INSERT_UPDATE)
+    private Date updateTime;
+}

+ 30 - 0
fs-service/src/main/java/com/fs/qw/domain/QwConversationParticipant.java

@@ -0,0 +1,30 @@
+package com.fs.qw.domain;
+
+import com.baomidou.mybatisplus.annotation.*;
+import lombok.Data;
+
+/**
+ * 消息参与者关联表(用于快速过滤)
+ */
+@Data
+@TableName("qw_conversation_participant")
+public class QwConversationParticipant {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    /** 企业ID */
+    private String corpId;
+
+    /** 消息ID(关联 msgid) */
+    private String msgid;
+
+    /** 参与者类型:1-发送者,2-接收者 */
+    private Integer participantType;
+
+    /** 用户类型:1-员工,2-外部联系人,3-机器人 */
+    private Integer userType;
+
+    /** 用户ID(员工userid或外部联系人id) */
+    private String userId;
+}

+ 37 - 0
fs-service/src/main/java/com/fs/qw/domain/QwConversationSyncState.java

@@ -0,0 +1,37 @@
+package com.fs.qw.domain;
+
+import com.baomidou.mybatisplus.annotation.*;
+import lombok.Data;
+import java.util.Date;
+
+/**
+ * 会话同步游标状态
+ */
+@Data
+@TableName("qw_conversation_sync_state")
+public class QwConversationSyncState {
+
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    /** 企业ID */
+    private String corpId;
+
+    /** 专区程序ID */
+    private String programId;
+
+    /** 能力ID(invoke_sync_msg) */
+    private String abilityId;
+
+    /** 上次同步的next_cursor */
+    private String cursor;
+
+    /** 上次同步结束时间 */
+    private Date lastSyncTime;
+
+    @TableField(fill = FieldFill.INSERT)
+    private Date createTime;
+
+    @TableField(fill = FieldFill.INSERT_UPDATE)
+    private Date updateTime;
+}

+ 60 - 0
fs-service/src/main/java/com/fs/qw/mapper/QwConversationMessageMapper.java

@@ -0,0 +1,60 @@
+package com.fs.qw.mapper;
+
+import com.fs.qw.domain.QwConversationMessage;
+import org.apache.ibatis.annotations.Param;
+import java.util.List;
+
+public interface QwConversationMessageMapper {
+
+    /**
+     * 插入一条消息
+     */
+    int insert(QwConversationMessage message);
+
+    /**
+     * 根据主键删除
+     */
+    int deleteById(Long id);
+
+    /**
+     * 根据 msgid 删除(用于重试)
+     */
+    int deleteByMsgid(@Param("corpId") String corpId, @Param("msgid") String msgid);
+
+    /**
+     * 更新消息(例如更新 secretKey)
+     */
+    int updateById(QwConversationMessage message);
+
+    /**
+     * 根据主键查询
+     */
+    QwConversationMessage selectById(Long id);
+
+    /**
+     * 根据企业ID和msgid查询
+     */
+    QwConversationMessage selectByCorpIdAndMsgid(@Param("corpId") String corpId, @Param("msgid") String msgid);
+
+    /**
+     * 检查是否存在(用于去重)
+     */
+    int existsByCorpIdAndMsgid(@Param("corpId") String corpId, @Param("msgid") String msgid);
+
+    /**
+     * 查询两个参与者之间的会话列表(按时间升序)
+     */
+    List<QwConversationMessage> selectMessagesBetweenUsers(@Param("corpId") String corpId,
+                                                            @Param("staffUserId") String staffUserId,
+                                                            @Param("customerId") String customerId,
+                                                            @Param("limit") Long limit);
+
+    /**
+     * 分页查询(可扩展条件)
+     */
+    List<QwConversationMessage> selectListByCondition(@Param("corpId") String corpId,
+                                                        @Param("startTime") Long startTime,
+                                                        @Param("endTime") Long endTime,
+                                                        @Param("offset") Integer offset,
+                                                        @Param("limit") Integer limit);
+}

+ 22 - 0
fs-service/src/main/java/com/fs/qw/mapper/QwConversationParticipantMapper.java

@@ -0,0 +1,22 @@
+package com.fs.qw.mapper;
+
+import com.fs.qw.domain.QwConversationParticipant;
+import org.apache.ibatis.annotations.Param;
+import java.util.List;
+
+public interface QwConversationParticipantMapper {
+
+    int insert(QwConversationParticipant participant);
+
+    int deleteByMsgid(@Param("corpId") String corpId, @Param("msgid") String msgid);
+
+    int deleteByMsgidAndParticipantType(@Param("corpId") String corpId, @Param("msgid") String msgid, @Param("participantType") Integer participantType);
+
+    List<QwConversationParticipant> selectByMsgid(@Param("corpId") String corpId, @Param("msgid") String msgid);
+
+    List<QwConversationParticipant> selectByUser(@Param("corpId") String corpId, @Param("userType") Integer userType, @Param("userId") String userId);
+
+    List<QwConversationParticipant> selectByMsgidAndType(@Param("corpId") String corpId,
+                                                         @Param("msgid") String msgid,
+                                                         @Param("participantType") Integer participantType);
+}

+ 26 - 0
fs-service/src/main/java/com/fs/qw/mapper/QwConversationSyncStateMapper.java

@@ -0,0 +1,26 @@
+package com.fs.qw.mapper;
+
+import com.fs.qw.domain.QwConversationSyncState;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.Date;
+
+public interface QwConversationSyncStateMapper {
+
+    int insert(QwConversationSyncState state);
+
+    int updateById(QwConversationSyncState state);
+
+    int updateSyncTime(@Param("corpId") String corpId, @Param("lastSyncTime") Date lastSyncTime);
+
+    int deleteById(Long id);
+
+    QwConversationSyncState selectById(Long id);
+
+    QwConversationSyncState selectByCorpId(@Param("corpId") String corpId);
+
+    int updateCursor(@Param("corpId") String corpId,
+                     @Param("cursor") String cursor,
+                     @Param("programId") String programId,
+                     @Param("abilityId") String abilityId);
+}

+ 13 - 6
fs-service/src/main/java/com/fs/qw/service/ICorporateWeChatSpaceService.java

@@ -3,24 +3,31 @@ package com.fs.qw.service;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.qw.vo.QwSessionConfigVo;
 
+import java.util.List;
+
 public interface ICorporateWeChatSpaceService {
     /**
      * 通过专区中转获取会话记录
      */
-    JSONObject fetchConversations(long limit,long timeout,String cursor, String customerId,String staffUserId);
+    JSONObject fetchConversations(long limit,long timeout,String cursor, String customerId,String staffUserId,String corpid);
 
     /**
      * 获取 agentConfig 签名(供前端 JS-SDK 使用)
      */
-    JSONObject getAgentConfigSignature(String url);
+    JSONObject getAgentConfigSignature(String url,String corpid);
 
     /**
      * Web 登录(用 code 换取 userId)
      */
-    JSONObject login(String code);
+    JSONObject login(String code,String corpid);
 
     /**
-     * 获取企业微信专区会话配置
-     * */
-    QwSessionConfigVo getQwSessionConfig();
+     * 获取所有企业微信专区会话配置列表
+     */
+    public List<QwSessionConfigVo> getQwSessionConfigList();
+
+    /**
+     * 根据企业ID获取单个配置
+     */
+    public QwSessionConfigVo getQwSessionConfigByCorpid(String corpid);
 }

+ 280 - 0
fs-service/src/main/java/com/fs/qw/service/impl/ConversationSyncService.java

@@ -0,0 +1,280 @@
+package com.fs.qw.service.impl;
+
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.fs.common.utils.DateUtils;
+import com.fs.qw.domain.QwConversationMessage;
+import com.fs.qw.domain.QwConversationParticipant;
+import com.fs.qw.domain.QwConversationSyncState;
+import com.fs.qw.mapper.QwConversationMessageMapper;
+import com.fs.qw.mapper.QwConversationParticipantMapper;
+import com.fs.qw.mapper.QwConversationSyncStateMapper;
+import com.fs.qw.service.ICorporateWeChatSpaceService;
+import com.fs.qw.utils.WeChatSpaceDecryptUtil;
+import com.fs.qw.utils.WeChatSpaceUtil;
+import com.fs.qw.vo.QwSessionConfigVo;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Date;
+
+@Service
+@Slf4j
+public class ConversationSyncService {
+
+    @Autowired
+    private ICorporateWeChatSpaceService weChatSpaceService;
+
+    @Autowired
+    private QwConversationMessageMapper messageMapper;
+
+    @Autowired
+    private QwConversationParticipantMapper participantMapper;
+
+    @Autowired
+    private QwConversationSyncStateMapper stateMapper;
+
+    @Autowired
+    private WeChatSpaceUtil weChatSpaceUtil;
+
+    private final RestTemplate restTemplate = new RestTemplate();
+
+    /**
+     * 全量同步指定企业的会话记录(一直拉取直到 has_more=0)
+     * 兜底策略:
+     * - 最大页数 1000
+     * - 连续空页 10 次退出
+     * - 游标不变检测
+     * - 全局超时 30 分钟
+     */
+    @Transactional(rollbackFor = Exception.class)
+    public void syncConversationsForCorp(String corpId) {
+        // 1. 获取配置并校验必要字段
+        QwSessionConfigVo config = weChatSpaceService.getQwSessionConfigByCorpid(corpId);
+        if (config == null) {
+            log.error("企业 {} 未配置企微专区信息,跳过同步", corpId);
+            return;
+        }
+        if (StringUtils.isBlank(config.getProgramId()) || StringUtils.isBlank(config.getAgentSecret())) {
+            log.error("企业 {} 配置不完整(缺少 programId 或 agentSecret),跳过同步", corpId);
+            return;
+        }
+
+        String abilityId = config.getAbilityIds().stream()
+                .filter(item -> "invokeSyncMsg".equals(item.getKey()))
+                .map(QwSessionConfigVo.AbilityItem::getValue)
+                .findFirst()
+                .orElse(null);
+        if (abilityId == null) {
+            log.error("企业 {} 未配置 invoke_sync_msg 能力ID", corpId);
+            return;
+        }
+
+        QwConversationSyncState state = stateMapper.selectByCorpId(corpId);
+        String cursor = (state != null) ? state.getCursor() : null;
+
+        int totalInserted = 0;
+        boolean hasMore = true;
+        String nextCursor = cursor;
+        int currentPage = 0;
+        int emptyPageCount = 0;
+        String lastCursor = null;
+        long startTime = System.currentTimeMillis();
+        long timeoutMillis = 30 * 60 * 1000L; // 30分钟
+        boolean syncError = false; // 标记是否发生错误
+
+        while (hasMore) {
+            currentPage++;
+
+            // 超时检查
+            if (System.currentTimeMillis() - startTime > timeoutMillis) {
+                log.warn("企业 {} 同步超时(>30分钟),强制退出", corpId);
+                syncError = true;
+                break;
+            }
+            // 最大页数限制
+            if (currentPage > 1000) {
+                log.warn("企业 {} 已拉取 {} 页,达到最大页数限制 1000,强制退出", corpId, currentPage);
+                syncError = true;
+                break;
+            }
+
+            // 构建请求
+            JSONObject requestData = new JSONObject();
+            if (StringUtils.isNotBlank(nextCursor)) {
+                requestData.set("cursor", nextCursor);
+            }
+            requestData.set("limit", 1000);
+
+            JSONObject response = callSyncProgram(config, abilityId, requestData);
+            if (response == null || response.getInt("errcode") != 0) {
+                log.error("拉取会话失败,corpId={}, response={}", corpId, response);
+                syncError = true;
+                break;
+            }
+
+            String responseDataStr = response.getStr("response_data");
+            if (StringUtils.isBlank(responseDataStr)) {
+                log.error("response_data 为空,corpId={}", corpId);
+                syncError = true;
+                break;
+            }
+            JSONObject respData = JSONUtil.parseObj(responseDataStr);
+            if (respData.getInt("errcode") != 0) {
+                log.error("专区内部错误,corpId={}, respData={}", corpId, respData);
+                syncError = true;
+                break;
+            }
+
+            JSONArray msgList = respData.getJSONArray("msg_list");
+            int msgCount = (msgList != null) ? msgList.size() : 0;
+
+            if (msgCount == 0) {
+                emptyPageCount++;
+                if (emptyPageCount >= 10) {
+                    log.warn("企业 {} 连续 {} 页无消息,强制退出", corpId, emptyPageCount);
+                    syncError = true;
+                    break;
+                }
+                log.debug("企业 {} 第 {} 页空消息,空页计数 {}", corpId, currentPage, emptyPageCount);
+            } else {
+                emptyPageCount = 0;
+                int inserted = saveMessagesToDb(corpId, msgList, config.getPrivateKey());
+                totalInserted += inserted;
+                log.info("企业 {} 第 {} 页保存 {} 条,累计 {} 条", corpId, currentPage, inserted, totalInserted);
+            }
+
+            hasMore = respData.getInt("has_more") == 1;
+            String newCursor = respData.getStr("next_cursor");
+
+            if (StringUtils.isNotBlank(lastCursor) && StringUtils.isNotBlank(newCursor) && lastCursor.equals(newCursor)) {
+                log.warn("企业 {} 游标连续两次相同 [{}],强制退出", corpId, newCursor);
+                syncError = true;
+                break;
+            }
+            lastCursor = newCursor;
+            nextCursor = newCursor;
+
+            // 每页成功拉取后更新 cursor(仅更新,不涉及 last_sync_time)
+            updateCursor(corpId, nextCursor, config.getProgramId(), abilityId);
+        }
+
+        // 只有成功完成(没有发生错误)才更新最后同步时间
+        if (!syncError) {
+            int updated = stateMapper.updateSyncTime(corpId, DateUtils.getNowDate());
+            if (updated == 0) {
+                // 没有记录则插入一条
+                QwConversationSyncState newState = new QwConversationSyncState();
+                newState.setCorpId(corpId);
+                newState.setProgramId(config.getProgramId());
+                newState.setAbilityId(abilityId);
+                newState.setCursor(nextCursor);
+                newState.setLastSyncTime(new Date());
+                stateMapper.insert(newState);
+            }
+            log.info("企业 {} 同步完成,拉取 {} 页,新增 {} 条消息", corpId, currentPage, totalInserted);
+        } else {
+            log.error("企业 {} 同步过程中出现错误,未更新最后同步时间,请检查配置", corpId);
+        }
+    }
+
+    private JSONObject callSyncProgram(QwSessionConfigVo config, String abilityId, JSONObject requestData) {
+        String accessToken = weChatSpaceUtil.getAccessToken(config.getCorpid(), config.getAgentSecret());
+        String url = "https://qyapi.weixin.qq.com/cgi-bin/chatdata/sync_call_program?access_token=" + accessToken;
+        JSONObject requestBody = new JSONObject();
+        requestBody.set("program_id", config.getProgramId());
+        requestBody.set("ability_id", abilityId);
+        requestBody.set("request_data", JSONUtil.toJsonStr(requestData));
+        log.info("调用专区接口 - URL: {}", url);
+        log.info("调用专区接口 - 请求体: {}", JSONUtil.toJsonStr(requestBody));
+        try {
+            JSONObject response = restTemplate.postForObject(url, requestBody, JSONObject.class);
+            //log.info("调用专区接口 - 响应: {}", response);
+            return response;
+        } catch (Exception e) {
+            log.error("调用专区接口异常", e);
+            return null;
+        }
+    }
+
+    private int saveMessagesToDb(String corpId, JSONArray msgList, String privateKeyPem) {
+        int insertCount = 0;
+        for (Object obj : msgList) {
+            JSONObject msg = (JSONObject) obj;
+            String msgid = msg.getStr("msgid");
+            if (messageMapper.existsByCorpIdAndMsgid(corpId, msgid) > 0) {
+                continue;
+            }
+
+            JSONObject encryptInfo = msg.getJSONObject("service_encrypt_info");
+            if (encryptInfo == null) {
+                log.warn("消息 {} 缺少 service_encrypt_info,跳过", msgid);
+                continue;
+            }
+            String encryptedKey = encryptInfo.getStr("encrypted_secret_key");
+            if (StringUtils.isBlank(encryptedKey)) {
+                log.warn("消息 {} 缺少 encrypted_secret_key,跳过", msgid);
+                continue;
+            }
+
+            String secretKey;
+            try {
+                secretKey = WeChatSpaceDecryptUtil.decryptSecretKey(encryptedKey, privateKeyPem);
+            } catch (Exception e) {
+                log.error("解密失败,msgid={}, 跳过", msgid, e);
+                continue;
+            }
+
+            QwConversationMessage record = new QwConversationMessage();
+            record.setCorpId(corpId);
+            record.setMsgid(msgid);
+            JSONObject sender = msg.getJSONObject("sender");
+            if (sender == null) {
+                log.warn("消息 {} 缺少 sender,跳过", msgid);
+                continue;
+            }
+            record.setSenderType(sender.getInt("type"));
+            record.setSenderId(sender.getStr("id"));
+            record.setChatid(msg.getStr("chatid"));
+            record.setMsgtype(msg.getInt("msgtype"));
+            record.setSendTime(msg.getLong("send_time"));
+            record.setEncryptedSecretKey(encryptedKey);
+            record.setSecretKey(secretKey);
+            record.setPublicKeyVer(encryptInfo.getInt("public_key_ver"));
+            if (msg.containsKey("extra_info")) {
+                record.setExtraInfo(JSONUtil.toJsonStr(msg.getJSONObject("extra_info")));
+            }
+            messageMapper.insert(record);
+
+            insertParticipant(corpId, msgid, 1, record.getSenderType(), record.getSenderId());
+            JSONArray receivers = msg.getJSONArray("receiver_list");
+            if (receivers != null) {
+                for (Object rObj : receivers) {
+                    JSONObject r = (JSONObject) rObj;
+                    insertParticipant(corpId, msgid, 2, r.getInt("type"), r.getStr("id"));
+                }
+            }
+            insertCount++;
+        }
+        return insertCount;
+    }
+
+    private void insertParticipant(String corpId, String msgid, int participantType, int userType, String userId) {
+        QwConversationParticipant p = new QwConversationParticipant();
+        p.setCorpId(corpId);
+        p.setMsgid(msgid);
+        p.setParticipantType(participantType);
+        p.setUserType(userType);
+        p.setUserId(userId);
+        participantMapper.insert(p);
+    }
+
+    private void updateCursor(String corpId, String nextCursor, String programId, String abilityId) {
+        stateMapper.updateCursor(corpId, nextCursor, programId, abilityId);
+    }
+}

+ 90 - 91
fs-service/src/main/java/com/fs/qw/service/impl/ICorporateWeChatSpaceServiceImpl.java

@@ -5,6 +5,10 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.exception.CustomException;
+import com.fs.qw.domain.QwConversationMessage;
+import com.fs.qw.domain.QwConversationParticipant;
+import com.fs.qw.mapper.QwConversationMessageMapper;
+import com.fs.qw.mapper.QwConversationParticipantMapper;
 import com.fs.qw.service.ICorporateWeChatSpaceService;
 import com.fs.qw.utils.WeChatSpaceDecryptUtil;
 import com.fs.qw.utils.WeChatSpaceUtil;
@@ -20,6 +24,7 @@ import lombok.RequiredArgsConstructor;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -32,6 +37,14 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
     @Autowired
     private ISysConfigService sysConfigService;
 
+    @Autowired
+    private QwConversationMessageMapper messageMapper;
+    @Autowired
+    private QwConversationParticipantMapper participantMapper;
+
+    @Autowired
+    private WeChatSpaceUtil weChatSpaceUtil;
+
     private final RestTemplate restTemplate = new RestTemplate();
 
     private final ConcurrentHashMap<String, String> consumedCodes = new ConcurrentHashMap<>();
@@ -39,93 +52,61 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
     // 系统配置缓存前缀
     private final static String CONFIG_KEY = "qw.sessionConfig";
 
-    //获取会话记录Key
-    private final static String targetKey = "invokeSyncMsg";
-
     private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
-    // =============== 核心:通过专区中转拉取会话 ===============
+    // =============== 查询数据库保存的会话 ===============
     @Override
-    public JSONObject fetchConversations(long limit,long timeout,String cursor,
-                                         String customerId, String staffUserId) {
+    public JSONObject fetchConversations(long limit, long timeout, String cursor,
+                                         String customerId, String staffUserId, String corpid) {
         JSONObject result = new JSONObject();
         try {
-            QwSessionConfigVo qwConfig = getQwSessionConfig();
-            String corpid = qwConfig.getCorpid();
-            String agentSecret = qwConfig.getAgentSecret();
-            String accessToken = WeChatSpaceUtil.getAccessToken(corpid, agentSecret);
-
-            // 构建 request_data(invoke_sync_msg 能力要求)
-            JSONObject requestData = new JSONObject();
-            if (StringUtils.isNotBlank(cursor)){// 首次为空不传
-                requestData.put("cursor", cursor);
-            }
-
-            requestData.put("limit", limit > 0 ? limit : 200);  // 限制 1-1000
-            //requestData.put("token", "");  // 暂不传 token,有频率限制但测试够用
-
-            String abilityId = null;
-
-            if (qwConfig.getAbilityIds() != null) {
-                //根据配置的能力Key找到对应的 abilityId
-                abilityId = qwConfig.getAbilityIds().stream()
-                        .filter(item -> targetKey.equals(item.getKey()))
-                        .map(QwSessionConfigVo.AbilityItem::getValue)
-                        .findFirst()
-                        .orElse(null);
-            }
-            if (abilityId == null) {
-                throw new CustomException("未配置获取会话记录的能力ID");
-            }
-
-            // 调用 sync_call_program
-            String url = "https://qyapi.weixin.qq.com/cgi-bin/chatdata/sync_call_program?access_token=" + accessToken;
-            JSONObject requestBody = new JSONObject();
-            requestBody.put("program_id", qwConfig.getProgramId());
-            requestBody.put("ability_id", abilityId);
-            requestBody.put("request_data", JSON.toJSONString(requestData));
-
-            log.info("调用专区接口: ability_id={}, request_data={}", abilityId, requestData);
-            JSONObject response = restTemplate.postForObject(url, requestBody, JSONObject.class);
-            //log.info("专区响应: {}", response);
-
-            if (response != null && response.getInteger("errcode") == 0) {
-                String responseDataStr = response.getString("response_data");
-                if (responseDataStr != null) {
-                    JSONObject responseData = JSON.parseObject(responseDataStr);
-                    Integer innerErrCode = responseData.getInteger("errcode");
-                    if (innerErrCode != null && innerErrCode == 0) {
-                        //获取 cursor用于下次拉取更多数据
-                        String nextCursor = responseData.getString("next_cursor");
-                        // 获取消息列表并处理
-                        JSONArray msgList = responseData.getJSONArray("msg_list");
-                        if (msgList != null && !msgList.isEmpty()) {
-                            // 解密 + 过滤 + 格式化
-                            JSONArray processedList = processMessages(msgList, customerId, staffUserId, qwConfig);
-                            result.put("data", processedList);
-                        } else {
-                            result.put("data", new JSONArray());
-                        }
-                        result.put("errcode", 0);
-                        result.put("errmsg", "ok");
-                        //返回 has_more 和 next_cursor 给前端,当没有更多数据时,返回 has_more 为 0
-                        result.put("has_more", responseData.getInteger("has_more"));
-                        result.put("next_cursor", nextCursor);
-                    } else {
-                        // 专区内部错误
-                        result.put("errcode", innerErrCode);
-                        result.put("errmsg", responseData.getString("errmsg"));
-                    }
-                } else {
-                    result.put("errcode", -1);
-                    result.put("errmsg", "专区返回数据格式错误");
+            // 从数据库查询两个参与者之间的消息
+            List<QwConversationMessage> messages = messageMapper.selectMessagesBetweenUsers(
+                    corpid, staffUserId, customerId, limit);
+
+            JSONArray dataArray = new JSONArray();
+            for (QwConversationMessage msg : messages) {
+                JSONObject item = new JSONObject();
+                item.put("msgid", msg.getMsgid());
+                item.put("secretKey", msg.getSecretKey()); // 已解密存储
+
+                // sender 对象
+                JSONObject sender = new JSONObject();
+                sender.put("type", msg.getSenderType());
+                sender.put("id", msg.getSenderId());
+                item.put("sender", sender);
+
+                // receiver_list:从参与者表获取 type=2 的参与者(接收者)
+                List<QwConversationParticipant> receivers = participantMapper.selectByMsgidAndType(
+                        corpid, msg.getMsgid(), 2);
+                JSONArray receiverList = new JSONArray();
+                for (QwConversationParticipant p : receivers) {
+                    JSONObject recv = new JSONObject();
+                    recv.put("type", p.getUserType());
+                    recv.put("id", p.getUserId());
+                    receiverList.add(recv);
                 }
-            } else {
-                result.put("errcode", response != null ? response.getInteger("errcode") : -1);
-                result.put("errmsg", response != null ? response.getString("errmsg") : "调用专区失败");
+                item.put("receiver_list", receiverList);
+                item.put("msgtype", msg.getMsgtype());
+
+                // 时间格式化
+                if (msg.getSendTime() != null) {
+                    String formattedTime = Instant.ofEpochSecond(msg.getSendTime())
+                            .atZone(ZoneId.systemDefault())
+                            .toLocalDateTime()
+                            .format(DATE_TIME_FORMATTER);
+                    item.put("send_time_str", formattedTime);
+                    item.put("send_time", msg.getSendTime());
+                }
+                dataArray.add(item);
             }
+            result.put("data", dataArray);
+            result.put("errcode", 0);
+            result.put("errmsg", "ok");
+            result.put("has_more", 0);
+            result.put("next_cursor", "");
         } catch (Exception e) {
-            log.error("获取会话记录失败", e);
+            log.error("查询会话记录失败", e);
             result.put("errcode", -1);
             result.put("errmsg", "内部错误:" + e.getMessage());
         }
@@ -135,13 +116,13 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
 
 
     @Override
-    public JSONObject getAgentConfigSignature(String url) {
-        QwSessionConfigVo qwSessionConfig = getQwSessionConfig();
-        return WeChatSpaceUtil.generateAgentConfigSignature(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret(), qwSessionConfig.getAgentid(), url);
+    public JSONObject getAgentConfigSignature(String url,String corpid) {
+        QwSessionConfigVo qwSessionConfig = getQwSessionConfigByCorpid(corpid);
+        return weChatSpaceUtil.generateAgentConfigSignature(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret(), qwSessionConfig.getAgentid(), url);
     }
 
     @Override
-    public JSONObject login(String code) {
+    public JSONObject login(String code,String corpid) {
         JSONObject result = new JSONObject();
         if (code == null || code.isEmpty()) {
             result.put("errcode", -1);
@@ -153,9 +134,9 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
             result.put("errmsg", "code already used");
             return result;
         }
-        QwSessionConfigVo qwSessionConfig = getQwSessionConfig();
+        QwSessionConfigVo qwSessionConfig = getQwSessionConfigByCorpid(corpid);
         try {
-            String accessToken = WeChatSpaceUtil.getAccessToken(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret());
+            String accessToken = weChatSpaceUtil.getAccessToken(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret());
             String url = "https://qyapi.weixin.qq.com/cgi-bin/user/getuserinfo?access_token="
                     + accessToken + "&code=" + code;
             JSONObject resp = restTemplate.getForObject(url, JSONObject.class);
@@ -181,16 +162,34 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
     }
 
     /**
-     * 获取企业微信专区会话配置
+     * 获取所有企业微信专区会话配置列表
      */
     @Override
-    public QwSessionConfigVo getQwSessionConfig() {
-        QwSessionConfigVo qwSessionConfig = sysConfigService.getConfig(CONFIG_KEY, QwSessionConfigVo.class);
-        if (qwSessionConfig == null){
-            log.error("未找到企微专区配置,key:{}",CONFIG_KEY);
+    public List<QwSessionConfigVo> getQwSessionConfigList() {
+        String json = sysConfigService.selectConfigByKey(CONFIG_KEY);
+        if (StringUtils.isBlank(json)) {
+            log.error("未找到企微专区配置, key:{}", CONFIG_KEY);
             throw new CustomException("未找到企微专区配置");
         }
-        return qwSessionConfig;
+        try {
+            // 直接解析为 List
+            return JSON.parseArray(json, QwSessionConfigVo.class);
+        } catch (Exception e) {
+            log.error("解析企微专区配置失败", e);
+            throw new CustomException("企微专区配置格式错误");
+        }
+    }
+
+    /**
+     * 根据企业ID获取单个配置
+     */
+    @Override
+    public QwSessionConfigVo getQwSessionConfigByCorpid(String corpid) {
+        List<QwSessionConfigVo> all = getQwSessionConfigList();
+        return all.stream()
+                .filter(config -> config.getCorpid().equals(corpid))
+                .findFirst()
+                .orElseThrow(() -> new CustomException("未找到corpid为 " + corpid + " 的配置"));
     }
 
 

+ 44 - 47
fs-service/src/main/java/com/fs/qw/utils/WeChatSpaceUtil.java

@@ -1,84 +1,81 @@
 package com.fs.qw.utils;
 
 import com.alibaba.fastjson.JSONObject;
+import com.fs.qw.domain.QwCompany;
+import com.fs.qw.service.IQwCompanyService;
+import com.fs.qwApi.service.QwApiService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.UUID;
-
-/**
- * 企业微信 token / ticket 工具类
- */
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+@Slf4j
+@Service
 public class WeChatSpaceUtil {
 
     private static final RestTemplate restTemplate = new RestTemplate();
-    // 缓存 access_token,实际生产应使用 redis 或数据库
-    private static String accessTokenCache;
-    private static long accessTokenExpireTime = 0;
 
-    // 缓存 agent_ticket
-    private static String agentTicketCache;
-    private static long agentTicketExpireTime = 0;
+    // 按企业ID缓存 agent_ticket(key = corpId + "_" + agentId,确保不同应用隔离)
+    private final ConcurrentMap<String, String> AGENT_TICKET_CACHE = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Long> AGENT_TICKET_EXPIRE = new ConcurrentHashMap<>();
+
+    @Autowired
+    private QwApiService qwApiService;
+
+    @Autowired
+    private IQwCompanyService qwCompanyService;
 
     /**
-     * 获取企业 access_token(带缓存)
+     * 获取企业 access_token(带缓存,由 QwApiService 实现
      */
-    public static String getAccessToken(String corpId, String corpSecret) {
-        long now = System.currentTimeMillis() / 1000;
-        if (accessTokenCache != null && now < accessTokenExpireTime) {
-            return accessTokenCache;
-        }
-        String url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=" + corpId + "&corpsecret=" + corpSecret;
-        JSONObject resp = restTemplate.getForObject(url, JSONObject.class);
-        if (resp != null && resp.getIntValue("errcode") == 0) {
-            accessTokenCache = resp.getString("access_token");
-            accessTokenExpireTime = now + resp.getIntValue("expires_in") - 300; // 提前5分钟过期
-            return accessTokenCache;
-        }
-        throw new RuntimeException("获取 access_token 失败: " + (resp == null ? "null response" : resp.getString("errmsg")));
+    public String getAccessToken(String corpId, String corpSecret) {
+        QwCompany qwCompany = qwCompanyService.selectQwCompanyByCorpId(corpId);
+        String openSecret = qwCompany.getOpenSecret();
+        return qwApiService.getToken(corpId, openSecret);
     }
 
     /**
-     * 获取 agent_ticket(必须用于 agentConfig 签名)
-     * @param corpId     企业ID
-     * @param corpSecret 应用 secret
-     * @param agentId    应用ID
+     * 获取 agent_ticket(按企业+应用独立缓存)
      */
-    public static String getAgentTicket(String corpId, String corpSecret, String agentId) {
+    public String getAgentTicket(String corpId, String corpSecret, String agentId) {
         long now = System.currentTimeMillis() / 1000;
-        if (agentTicketCache != null && now < agentTicketExpireTime) {
-            return agentTicketCache;
+        String cacheKey = corpId + "_" + agentId;  // 区分不同企业的不同应用
+
+        if (AGENT_TICKET_CACHE.containsKey(cacheKey) && now < AGENT_TICKET_EXPIRE.get(cacheKey)) {
+            return AGENT_TICKET_CACHE.get(cacheKey);
         }
-        String accessToken = getAccessToken(corpId, corpSecret);
+
+        String accessToken = this.getAccessToken(corpId, corpSecret);
         String url = "https://qyapi.weixin.qq.com/cgi-bin/ticket/get?access_token=" + accessToken + "&type=agent_config";
         JSONObject resp = restTemplate.getForObject(url, JSONObject.class);
+
         if (resp != null && resp.getIntValue("errcode") == 0) {
-            agentTicketCache = resp.getString("ticket");
-            agentTicketExpireTime = now + resp.getIntValue("expires_in") - 300; // 提前5分钟刷新
-            return agentTicketCache;
+            String ticket = resp.getString("ticket");
+            int expiresIn = resp.getIntValue("expires_in");
+            AGENT_TICKET_CACHE.put(cacheKey, ticket);
+            AGENT_TICKET_EXPIRE.put(cacheKey, now + expiresIn - 300); // 提前5分钟刷新
+            return ticket;
         }
         throw new RuntimeException("获取 agent_ticket 失败: " + (resp == null ? "null response" : resp.getString("errmsg")));
     }
 
     /**
-     * 生成 agentConfig 签名(使用 agent_ticket)
-     * @param corpId     企业ID
-     * @param corpSecret 应用 secret
-     * @param agentId    应用ID
-     * @param url        当前页面完整URL(不含#)
+     * 生成 agentConfig 签名(实例方法)
      */
-    public static JSONObject generateAgentConfigSignature(String corpId, String corpSecret, String agentId, String url) {
+    public JSONObject generateAgentConfigSignature(String corpId, String corpSecret, String agentId, String url) {
         try {
-            // 1. 获取 agent_ticket
-            String ticket = WeChatSpaceUtil.getAgentTicket(corpId, corpSecret, agentId);
-            // 2. 生成随机串和时间戳
+            String ticket = this.getAgentTicket(corpId, corpSecret, agentId);
             String nonceStr = UUID.randomUUID().toString().replaceAll("-", "");
             String timestamp = Long.toString(System.currentTimeMillis() / 1000);
-            // 3. 拼接签名字符串
             String signStr = "jsapi_ticket=" + ticket + "&noncestr=" + nonceStr + "&timestamp=" + timestamp + "&url=" + url;
-            // 4. SHA1 签名
             String signature = DigestUtils.sha1Hex(signStr);
-
+            log.info("签名参数 - corpId:{}, agentId:{}, url:{}", corpId, agentId, url);
+            log.info("获取到的 ticket: {}", ticket);
+            log.info("签名结果 - timestamp:{}, nonceStr:{}, signature:{}", timestamp, nonceStr, signature);
             JSONObject result = new JSONObject();
             result.put("timestamp", timestamp);
             result.put("nonceStr", nonceStr);

+ 3 - 3
fs-service/src/main/java/com/fs/qw/vo/QwSessionConfigVo.java

@@ -10,18 +10,18 @@ import java.util.List;
 @Data
 public class QwSessionConfigVo {
 
+    //企业ID
     private String corpid;
 
+    //(关联专区程序的)自建应用ID
     private String agentid;
 
+    //(关联专区程序的)自建应用密钥
     private String agentSecret;
 
     //专区程序ID
     private String programId;
 
-    //自建应用可信域名
-    private String domain;
-
     //企业会话密钥
     private String privateKey;
 

+ 126 - 0
fs-service/src/main/resources/mapper/qw/QwConversationMessageMapper.xml

@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fs.qw.mapper.QwConversationMessageMapper">
+
+    <resultMap id="BaseResultMap" type="com.fs.qw.domain.QwConversationMessage">
+        <id column="id" property="id"/>
+        <result column="corp_id" property="corpId"/>
+        <result column="msgid" property="msgid"/>
+        <result column="sender_type" property="senderType"/>
+        <result column="sender_id" property="senderId"/>
+        <result column="chatid" property="chatid"/>
+        <result column="msgtype" property="msgtype"/>
+        <result column="send_time" property="sendTime"/>
+        <result column="encrypted_secret_key" property="encryptedSecretKey"/>
+        <result column="secret_key" property="secretKey"/>
+        <result column="public_key_ver" property="publicKeyVer"/>
+        <result column="extra_info" property="extraInfo"/>
+        <result column="create_time" property="createTime"/>
+        <result column="update_time" property="updateTime"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, corp_id, msgid, sender_type, sender_id, chatid, msgtype, send_time,
+        encrypted_secret_key, secret_key, public_key_ver, extra_info, create_time, update_time
+    </sql>
+
+    <!-- 插入 -->
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id">
+        INSERT INTO qw_conversation_message
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">corp_id,</if>
+            <if test="msgid != null">msgid,</if>
+            <if test="senderType != null">sender_type,</if>
+            <if test="senderId != null">sender_id,</if>
+            <if test="chatid != null">chatid,</if>
+            <if test="msgtype != null">msgtype,</if>
+            <if test="sendTime != null">send_time,</if>
+            <if test="encryptedSecretKey != null">encrypted_secret_key,</if>
+            <if test="secretKey != null">secret_key,</if>
+            <if test="publicKeyVer != null">public_key_ver,</if>
+            <if test="extraInfo != null">extra_info,</if>
+            create_time, update_time
+        </trim>
+        <trim prefix="VALUES (" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">#{corpId},</if>
+            <if test="msgid != null">#{msgid},</if>
+            <if test="senderType != null">#{senderType},</if>
+            <if test="senderId != null">#{senderId},</if>
+            <if test="chatid != null">#{chatid},</if>
+            <if test="msgtype != null">#{msgtype},</if>
+            <if test="sendTime != null">#{sendTime},</if>
+            <if test="encryptedSecretKey != null">#{encryptedSecretKey},</if>
+            <if test="secretKey != null">#{secretKey},</if>
+            <if test="publicKeyVer != null">#{publicKeyVer},</if>
+            <if test="extraInfo != null">#{extraInfo},</if>
+            NOW(), NOW()
+        </trim>
+    </insert>
+
+    <!-- 根据主键删除 -->
+    <delete id="deleteById">
+        DELETE FROM qw_conversation_message WHERE id = #{id}
+    </delete>
+
+    <!-- 根据企业+msgid删除 -->
+    <delete id="deleteByMsgid">
+        DELETE FROM qw_conversation_message WHERE corp_id = #{corpId} AND msgid = #{msgid}
+    </delete>
+
+    <!-- 更新(根据主键) -->
+    <update id="updateById" parameterType="com.fs.qw.domain.QwConversationMessage">
+        UPDATE qw_conversation_message
+        <set>
+            <if test="encryptedSecretKey != null">encrypted_secret_key = #{encryptedSecretKey},</if>
+            <if test="secretKey != null">secret_key = #{secretKey},</if>
+            <if test="publicKeyVer != null">public_key_ver = #{publicKeyVer},</if>
+            <if test="extraInfo != null">extra_info = #{extraInfo},</if>
+            update_time = NOW()
+        </set>
+        WHERE id = #{id}
+    </update>
+
+    <!-- 根据主键查询 -->
+    <select id="selectById" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/> FROM qw_conversation_message WHERE id = #{id}
+    </select>
+
+    <!-- 根据企业+msgid查询 -->
+    <select id="selectByCorpIdAndMsgid" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_message
+        WHERE corp_id = #{corpId} AND msgid = #{msgid}
+    </select>
+
+    <!-- 检查是否存在 -->
+    <select id="existsByCorpIdAndMsgid" resultType="int">
+        SELECT COUNT(1) FROM qw_conversation_message
+        WHERE corp_id = #{corpId} AND msgid = #{msgid}
+    </select>
+
+    <!-- 查询两个参与者之间的消息(通过参与者表关联) -->
+    <select id="selectMessagesBetweenUsers" resultMap="BaseResultMap">
+        SELECT DISTINCT
+        m.id, m.corp_id, m.msgid, m.sender_type, m.sender_id, m.chatid, m.msgtype, m.send_time,
+        m.encrypted_secret_key, m.secret_key, m.public_key_ver, m.extra_info, m.create_time, m.update_time
+        FROM qw_conversation_message m
+        INNER JOIN qw_conversation_participant p1 ON m.msgid = p1.msgid AND m.corp_id = p1.corp_id
+        INNER JOIN qw_conversation_participant p2 ON m.msgid = p2.msgid AND m.corp_id = p2.corp_id
+        WHERE m.corp_id = #{corpId}
+        AND p1.user_type = 1 AND p1.user_id = #{staffUserId}
+        AND p2.user_type = 2 AND p2.user_id = #{customerId}
+        ORDER BY m.send_time ASC
+        <if test="limit != null and limit > 0">LIMIT #{limit}</if>
+    </select>
+
+    <!-- 条件分页查询(按时间范围) -->
+    <select id="selectListByCondition" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_message
+        WHERE corp_id = #{corpId}
+        <if test="startTime != null">AND send_time >= #{startTime}</if>
+        <if test="endTime != null">AND send_time &lt;= #{endTime}</if>
+        ORDER BY send_time DESC
+        <if test="offset != null and limit != null">LIMIT #{offset}, #{limit}</if>
+    </select>
+</mapper>

+ 67 - 0
fs-service/src/main/resources/mapper/qw/QwConversationParticipantMapper.xml

@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fs.qw.mapper.QwConversationParticipantMapper">
+
+    <resultMap id="BaseResultMap" type="com.fs.qw.domain.QwConversationParticipant">
+        <id column="id" property="id"/>
+        <result column="corp_id" property="corpId"/>
+        <result column="msgid" property="msgid"/>
+        <result column="participant_type" property="participantType"/>
+        <result column="user_type" property="userType"/>
+        <result column="user_id" property="userId"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, corp_id, msgid, participant_type, user_type, user_id
+    </sql>
+
+    <!-- 插入参与者记录 -->
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id">
+        INSERT INTO qw_conversation_participant
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">corp_id,</if>
+            <if test="msgid != null">msgid,</if>
+            <if test="participantType != null">participant_type,</if>
+            <if test="userType != null">user_type,</if>
+            <if test="userId != null">user_id,</if>
+        </trim>
+        <trim prefix="VALUES (" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">#{corpId},</if>
+            <if test="msgid != null">#{msgid},</if>
+            <if test="participantType != null">#{participantType},</if>
+            <if test="userType != null">#{userType},</if>
+            <if test="userId != null">#{userId},</if>
+        </trim>
+    </insert>
+
+    <!-- 根据消息ID删除所有参与者 -->
+    <delete id="deleteByMsgid">
+        DELETE FROM qw_conversation_participant WHERE corp_id = #{corpId} AND msgid = #{msgid}
+    </delete>
+
+    <!-- 根据消息ID和参与者类型删除 -->
+    <delete id="deleteByMsgidAndParticipantType">
+        DELETE FROM qw_conversation_participant
+        WHERE corp_id = #{corpId} AND msgid = #{msgid} AND participant_type = #{participantType}
+    </delete>
+
+    <!-- 根据消息ID查询参与者列表 -->
+    <select id="selectByMsgid" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_participant
+        WHERE corp_id = #{corpId} AND msgid = #{msgid}
+    </select>
+
+    <!-- 查询用户参与的所有消息ID(用于快速过滤) -->
+    <select id="selectByUser" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_participant
+        WHERE corp_id = #{corpId} AND user_type = #{userType} AND user_id = #{userId}
+    </select>
+
+    <select id="selectByMsgidAndType" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_participant
+        WHERE corp_id = #{corpId} AND msgid = #{msgid} AND participant_type = #{participantType}
+    </select>
+</mapper>

+ 79 - 0
fs-service/src/main/resources/mapper/qw/QwConversationSyncStateMapper.xml

@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fs.qw.mapper.QwConversationSyncStateMapper">
+
+    <resultMap id="BaseResultMap" type="com.fs.qw.domain.QwConversationSyncState">
+        <id column="id" property="id"/>
+        <result column="corp_id" property="corpId"/>
+        <result column="program_id" property="programId"/>
+        <result column="ability_id" property="abilityId"/>
+        <result column="cursor" property="cursor"/>
+        <result column="last_sync_time" property="lastSyncTime"/>
+        <result column="create_time" property="createTime"/>
+        <result column="update_time" property="updateTime"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, corp_id, program_id, ability_id, `cursor`, last_sync_time, create_time, update_time
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id">
+        INSERT INTO qw_conversation_sync_state
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">corp_id,</if>
+            <if test="programId != null">program_id,</if>
+            <if test="abilityId != null">ability_id,</if>
+            <if test="cursor != null">`cursor`,</if>
+            <if test="lastSyncTime != null">last_sync_time,</if>
+            create_time, update_time
+        </trim>
+        <trim prefix="VALUES (" suffix=")" suffixOverrides=",">
+            <if test="corpId != null">#{corpId},</if>
+            <if test="programId != null">#{programId},</if>
+            <if test="abilityId != null">#{abilityId},</if>
+            <if test="cursor != null">#{cursor},</if>
+            <if test="lastSyncTime != null">#{lastSyncTime},</if>
+            NOW(), NOW()
+        </trim>
+    </insert>
+
+    <update id="updateById" parameterType="com.fs.qw.domain.QwConversationSyncState">
+        UPDATE qw_conversation_sync_state
+        <set>
+            <if test="programId != null">program_id = #{programId},</if>
+            <if test="abilityId != null">ability_id = #{abilityId},</if>
+            <if test="cursor != null">`cursor` = #{cursor},</if>
+            <if test="lastSyncTime != null">last_sync_time = #{lastSyncTime},</if>
+            update_time = NOW()
+        </set>
+        WHERE id = #{id}
+    </update>
+
+    <update id="updateSyncTime">
+        UPDATE qw_conversation_sync_state
+        SET last_sync_time = #{lastSyncTime}, update_time = NOW()
+        WHERE corp_id = #{corpId}
+    </update>
+
+    <delete id="deleteById">
+        DELETE FROM qw_conversation_sync_state WHERE id = #{id}
+    </delete>
+
+    <select id="selectById" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/> FROM qw_conversation_sync_state WHERE id = #{id}
+    </select>
+
+    <select id="selectByCorpId" resultMap="BaseResultMap">
+        SELECT <include refid="Base_Column_List"/>
+        FROM qw_conversation_sync_state
+        WHERE corp_id = #{corpId}
+    </select>
+
+    <insert id="updateCursor">
+        INSERT INTO qw_conversation_sync_state (corp_id, program_id, ability_id, `cursor`, last_sync_time, create_time, update_time)
+        VALUES (#{corpId}, #{programId}, #{abilityId}, #{cursor}, NULL, NOW(), NOW())
+        ON DUPLICATE KEY UPDATE
+                             `cursor` = VALUES(`cursor`),
+                             update_time = NOW()
+    </insert>
+</mapper>