|
|
@@ -2,18 +2,27 @@ package com.fs.qw.service.impl;
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fs.common.exception.CustomException;
|
|
|
import com.fs.qw.service.ICorporateWeChatSpaceService;
|
|
|
-import com.fs.qw.utils.WeChatTokenUtil;
|
|
|
-import com.fs.qw.utils.WeComSignatureUtil;
|
|
|
+import com.fs.qw.utils.WeChatSpaceDecryptUtil;
|
|
|
+import com.fs.qw.utils.WeChatSpaceUtil;
|
|
|
import com.fs.qw.vo.QwSessionConfigVo;
|
|
|
import com.fs.system.service.ISysConfigService;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.ZoneId;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
@@ -23,73 +32,100 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
|
|
|
@Autowired
|
|
|
private ISysConfigService sysConfigService;
|
|
|
|
|
|
+ private final RestTemplate restTemplate = new RestTemplate();
|
|
|
+
|
|
|
+ private final ConcurrentHashMap<String, String> consumedCodes = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 系统配置缓存前缀
|
|
|
private final static String CONFIG_KEY = "qw.sessionConfig";
|
|
|
|
|
|
- private final RestTemplate restTemplate = new RestTemplate();
|
|
|
- private final java.util.concurrent.ConcurrentHashMap<String, String> consumedCodes = new java.util.concurrent.ConcurrentHashMap<>();
|
|
|
+ //获取会话记录Key
|
|
|
+ private final static String targetKey = "invokeSyncMsg";
|
|
|
+
|
|
|
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
|
// =============== 核心:通过专区中转拉取会话 ===============
|
|
|
@Override
|
|
|
- public JSONObject fetchConversations(long seq, long limit, long proxy, long timeout,
|
|
|
+ public JSONObject fetchConversations(long limit,long timeout,String cursor,
|
|
|
String customerId, String staffUserId) {
|
|
|
JSONObject result = new JSONObject();
|
|
|
try {
|
|
|
- // 1. 获取企业微信配置(避免重复调用 getQwSessionConfig())
|
|
|
QwSessionConfigVo qwConfig = getQwSessionConfig();
|
|
|
String corpid = qwConfig.getCorpid();
|
|
|
String agentSecret = qwConfig.getAgentSecret();
|
|
|
- // 2. 获取 access_token
|
|
|
- String accessToken = WeChatTokenUtil.getAccessToken(corpid, agentSecret);
|
|
|
+ String accessToken = WeChatSpaceUtil.getAccessToken(corpid, agentSecret);
|
|
|
|
|
|
- // 3. 构建 request_data(与专区程序约定一致)
|
|
|
+ // 构建 request_data(invoke_sync_msg 能力要求)
|
|
|
JSONObject requestData = new JSONObject();
|
|
|
- requestData.put("action", qwConfig.getAbilityAction()); // 对应能力 action
|
|
|
- requestData.put("seq", seq);
|
|
|
- requestData.put("limit", limit);
|
|
|
- requestData.put("proxy", proxy);
|
|
|
- requestData.put("timeout", timeout);
|
|
|
- requestData.put("customerId", customerId);
|
|
|
- requestData.put("staffUserId", staffUserId);
|
|
|
-
|
|
|
- // 4. 能力ID
|
|
|
- String abilityId = qwConfig.getFetchConversationAbilityId();
|
|
|
- if (abilityId==null){
|
|
|
- throw new CustomException("专区能力ID未配置");
|
|
|
+ if (StringUtils.isNotBlank(cursor)){// 首次为空不传
|
|
|
+ requestData.put("cursor", cursor);
|
|
|
}
|
|
|
- // 5. 调用 sync_call_program 接口
|
|
|
+
|
|
|
+ requestData.put("limit", limit > 0 ? limit : 200); // 限制 1-1000
|
|
|
+ //requestData.put("token", ""); // 暂不传 token,有频率限制但测试够用
|
|
|
+
|
|
|
+ String abilityId = null;
|
|
|
+
|
|
|
+ if (qwConfig.getAbilityIds() != null) {
|
|
|
+ //根据配置的能力Key找到对应的 abilityId
|
|
|
+ abilityId = qwConfig.getAbilityIds().stream()
|
|
|
+ .filter(item -> targetKey.equals(item.getKey()))
|
|
|
+ .map(QwSessionConfigVo.AbilityItem::getValue)
|
|
|
+ .findFirst()
|
|
|
+ .orElse(null);
|
|
|
+ }
|
|
|
+ if (abilityId == null) {
|
|
|
+ throw new CustomException("未配置获取会话记录的能力ID");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 调用 sync_call_program
|
|
|
String url = "https://qyapi.weixin.qq.com/cgi-bin/chatdata/sync_call_program?access_token=" + accessToken;
|
|
|
JSONObject requestBody = new JSONObject();
|
|
|
+ requestBody.put("program_id", qwConfig.getProgramId());
|
|
|
requestBody.put("ability_id", abilityId);
|
|
|
requestBody.put("request_data", JSON.toJSONString(requestData));
|
|
|
- requestBody.put("program_id", qwConfig.getProgramId());
|
|
|
+
|
|
|
log.info("调用专区接口: ability_id={}, request_data={}", abilityId, requestData);
|
|
|
JSONObject response = restTemplate.postForObject(url, requestBody, JSONObject.class);
|
|
|
- log.info("专区响应: {}", response);
|
|
|
+ //log.info("专区响应: {}", response);
|
|
|
|
|
|
- // 6. 处理返回结果
|
|
|
if (response != null && response.getInteger("errcode") == 0) {
|
|
|
String responseDataStr = response.getString("response_data");
|
|
|
if (responseDataStr != null) {
|
|
|
JSONObject responseData = JSON.parseObject(responseDataStr);
|
|
|
- if (responseData.getInteger("errcode") == 0) {
|
|
|
+ Integer innerErrCode = responseData.getInteger("errcode");
|
|
|
+ if (innerErrCode != null && innerErrCode == 0) {
|
|
|
+ //获取 cursor用于下次拉取更多数据
|
|
|
+ String nextCursor = responseData.getString("next_cursor");
|
|
|
+ // 获取消息列表并处理
|
|
|
+ JSONArray msgList = responseData.getJSONArray("msg_list");
|
|
|
+ if (msgList != null && !msgList.isEmpty()) {
|
|
|
+ // 解密 + 过滤 + 格式化
|
|
|
+ JSONArray processedList = processMessages(msgList, customerId, staffUserId, qwConfig);
|
|
|
+ result.put("data", processedList);
|
|
|
+ } else {
|
|
|
+ result.put("data", new JSONArray());
|
|
|
+ }
|
|
|
result.put("errcode", 0);
|
|
|
result.put("errmsg", "ok");
|
|
|
- result.put("msgList", responseData.get("data"));
|
|
|
+ //返回 has_more 和 next_cursor 给前端,当没有更多数据时,返回 has_more 为 0
|
|
|
+ result.put("has_more", responseData.getInteger("has_more"));
|
|
|
+ result.put("next_cursor", nextCursor);
|
|
|
} else {
|
|
|
- result.put("errcode", responseData.getInteger("errcode"));
|
|
|
+ // 专区内部错误
|
|
|
+ result.put("errcode", innerErrCode);
|
|
|
result.put("errmsg", responseData.getString("errmsg"));
|
|
|
}
|
|
|
} else {
|
|
|
result.put("errcode", -1);
|
|
|
result.put("errmsg", "专区返回数据格式错误");
|
|
|
}
|
|
|
-
|
|
|
} else {
|
|
|
result.put("errcode", response != null ? response.getInteger("errcode") : -1);
|
|
|
- result.put("errmsg", response != null ? response.getString("errmsg") : "专区调用失败");
|
|
|
+ result.put("errmsg", response != null ? response.getString("errmsg") : "调用专区失败");
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("专区中转调用异常", e);
|
|
|
+ log.error("获取会话记录失败", e);
|
|
|
result.put("errcode", -1);
|
|
|
result.put("errmsg", "内部错误:" + e.getMessage());
|
|
|
}
|
|
|
@@ -97,10 +133,11 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
|
|
|
}
|
|
|
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
public JSONObject getAgentConfigSignature(String url) {
|
|
|
QwSessionConfigVo qwSessionConfig = getQwSessionConfig();
|
|
|
- return WeComSignatureUtil.generateAgentConfigSignature(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret(), qwSessionConfig.getAgentid(), url);
|
|
|
+ return WeChatSpaceUtil.generateAgentConfigSignature(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret(), qwSessionConfig.getAgentid(), url);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -118,7 +155,7 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
|
|
|
}
|
|
|
QwSessionConfigVo qwSessionConfig = getQwSessionConfig();
|
|
|
try {
|
|
|
- String accessToken = WeChatTokenUtil.getAccessToken(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret());
|
|
|
+ String accessToken = WeChatSpaceUtil.getAccessToken(qwSessionConfig.getCorpid(), qwSessionConfig.getAgentSecret());
|
|
|
String url = "https://qyapi.weixin.qq.com/cgi-bin/user/getuserinfo?access_token="
|
|
|
+ accessToken + "&code=" + code;
|
|
|
JSONObject resp = restTemplate.getForObject(url, JSONObject.class);
|
|
|
@@ -155,4 +192,82 @@ public class ICorporateWeChatSpaceServiceImpl implements ICorporateWeChatSpaceSe
|
|
|
}
|
|
|
return qwSessionConfig;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ private JSONArray processMessages(JSONArray msgList, String customerId, String staffUserId, QwSessionConfigVo qwConfig) {
|
|
|
+ return msgList.parallelStream()
|
|
|
+ .map(obj -> (JSONObject) obj)
|
|
|
+ .filter(msg -> isMessageRelatedToUsers(msg, customerId, staffUserId))
|
|
|
+ .map(msg -> decryptAndFormatMessage(msg, qwConfig))
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .collect(Collectors.toCollection(JSONArray::new));
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isMessageRelatedToUsers(JSONObject msg, String customerId, String staffUserId) {
|
|
|
+ // 如果都为空,则不过滤,返回全部
|
|
|
+ if ((customerId == null || customerId.isEmpty()) && (staffUserId == null || staffUserId.isEmpty())) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ boolean hasCustomer = (customerId == null || customerId.isEmpty());
|
|
|
+ boolean hasStaff = (staffUserId == null || staffUserId.isEmpty());
|
|
|
+
|
|
|
+ JSONObject sender = msg.getJSONObject("sender");
|
|
|
+ JSONArray receivers = msg.getJSONArray("receiver_list");
|
|
|
+
|
|
|
+ // 检查发送者
|
|
|
+ if (sender != null) {
|
|
|
+ String senderId = sender.getString("id");
|
|
|
+ int senderType = sender.getIntValue("type");
|
|
|
+ if (!hasCustomer && senderType == 2 && customerId.equals(senderId)) hasCustomer = true;
|
|
|
+ if (!hasStaff && senderType == 1 && staffUserId.equals(senderId)) hasStaff = true;
|
|
|
+ }
|
|
|
+ if (hasCustomer && hasStaff) return true;
|
|
|
+
|
|
|
+ // 检查接收者
|
|
|
+ if (receivers != null) {
|
|
|
+ for (int i = 0; i < receivers.size(); i++) {
|
|
|
+ JSONObject recv = receivers.getJSONObject(i);
|
|
|
+ String recvId = recv.getString("id");
|
|
|
+ int recvType = recv.getIntValue("type");
|
|
|
+ if (!hasCustomer && recvType == 2 && customerId.equals(recvId)) hasCustomer = true;
|
|
|
+ if (!hasStaff && recvType == 1 && staffUserId.equals(recvId)) hasStaff = true;
|
|
|
+ if (hasCustomer && hasStaff) return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return hasCustomer && hasStaff;
|
|
|
+ }
|
|
|
+
|
|
|
+ private JSONObject decryptAndFormatMessage(JSONObject msg, QwSessionConfigVo qwConfig) {
|
|
|
+ JSONObject result = new JSONObject();
|
|
|
+ try {
|
|
|
+ JSONObject encryptInfo = msg.getJSONObject("service_encrypt_info");
|
|
|
+ if (encryptInfo == null) return null;
|
|
|
+ String encryptedKey = encryptInfo.getString("encrypted_secret_key");
|
|
|
+ if (encryptedKey == null) return null;
|
|
|
+
|
|
|
+ // 解密得到 secretKey
|
|
|
+ String secretKey = WeChatSpaceDecryptUtil.decryptSecretKey(encryptedKey, qwConfig.getPrivateKey());
|
|
|
+
|
|
|
+ // 复制需要返回的字段
|
|
|
+ result.put("msgid", msg.getString("msgid"));
|
|
|
+ result.put("secretKey", secretKey);
|
|
|
+ result.put("sender", msg.get("sender"));
|
|
|
+ result.put("receiver_list", msg.get("receiver_list"));
|
|
|
+ result.put("msgtype", msg.getInteger("msgtype"));
|
|
|
+
|
|
|
+ Long sendTime = msg.getLong("send_time");
|
|
|
+ if (sendTime != null) {
|
|
|
+ String formattedTime = Instant.ofEpochSecond(sendTime)
|
|
|
+ .atZone(ZoneId.systemDefault())
|
|
|
+ .toLocalDateTime()
|
|
|
+ .format(DATE_TIME_FORMATTER);
|
|
|
+ result.put("send_time_str", formattedTime);
|
|
|
+ result.put("send_time", sendTime);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解密消息失败, msgid: {}", msg.getString("msgid"), e);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|