|
|
@@ -3,6 +3,7 @@ package com.fs.qw.service.impl;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import com.fs.common.core.domain.model.TenantPrincipal;
|
|
|
import com.fs.common.utils.PubFun;
|
|
|
import com.fs.course.domain.FsCourseSopAppLink;
|
|
|
import com.fs.course.mapper.FsCourseSopAppLinkMapper;
|
|
|
@@ -30,12 +31,18 @@ import com.fs.voice.utils.StringUtil;
|
|
|
import com.fs.wxUser.param.CompanyWxUserSopParam;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
+import com.fs.common.utils.StringUtils;
|
|
|
+import com.fs.common.utils.spring.SpringUtils;
|
|
|
+import com.fs.core.config.TenantConfigContext;
|
|
|
+import com.fs.system.domain.SysConfig;
|
|
|
+import com.fs.system.mapper.SysConfigMapper;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
|
|
|
+import org.springframework.security.core.context.SecurityContextHolder;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
+import java.lang.reflect.Method;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
|
@@ -62,13 +69,42 @@ public class AsyncSopTestService {
|
|
|
private final SopUserLogsMapper sopUserLogsMapper;
|
|
|
private final FsCourseSopAppLinkMapper fsCourseSopAppLinkMapper;
|
|
|
private final uniPush2Service push2Service;
|
|
|
+ private final SysConfigMapper sysConfigMapper;
|
|
|
|
|
|
/**
|
|
|
* 立即执行SOP任务
|
|
|
*/
|
|
|
@Async("scheduledExecutorService")
|
|
|
- public void executeSopByIds(String[] ids) {
|
|
|
+ public void executeSopByIds(String[] ids, Long tenantId) {
|
|
|
+ boolean isSwitched = false;
|
|
|
try {
|
|
|
+ // 1. 切换到租户数据源并加载配置
|
|
|
+ if (tenantId != null) {
|
|
|
+ try {
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("ensureSwitchByTenantId", Long.class);
|
|
|
+ method.invoke(manager, tenantId);
|
|
|
+ isSwitched = true;
|
|
|
+
|
|
|
+ // 加载租户 projectConfig
|
|
|
+ SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
|
|
|
+ if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
|
|
|
+ TenantConfigContext.set(JSON.parseObject(cfg.getConfigValue()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 设置租户到 SecurityContext,供 TenantKeyRedisSerializer 自动为 Redis Key 加 tenantid 前缀
|
|
|
+ SecurityContextHolder.getContext().setAuthentication(
|
|
|
+ new UsernamePasswordAuthenticationToken(
|
|
|
+ new TenantPrincipal(tenantId),
|
|
|
+ null,
|
|
|
+ Collections.emptyList()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("SOP异步任务切换租户数据源失败: tenantId={}", tenantId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
List<QwSopRuleTimeVO> ruleTimeVOList = qwSopMapper.executeSopByIds(ids);
|
|
|
List<QwSopTemp> tempList = qwSopTempMapper.selectListByIds(PubFun.listToNewList(ruleTimeVOList, QwSopRuleTimeVO::getTempId));
|
|
|
Map<String, QwSopTemp> tempMap = PubFun.listToMapByGroupObject(tempList, QwSopTemp::getId);
|
|
|
@@ -80,11 +116,23 @@ public class AsyncSopTestService {
|
|
|
updateQwSopStatus(ruleTimeVO.getId(), 0L);
|
|
|
return;
|
|
|
}
|
|
|
- processRuleTimeVOInternal(ruleTimeVO, qwSopTemp,sdf);
|
|
|
+ processRuleTimeVOInternal(ruleTimeVO, qwSopTemp, sdf, tenantId);
|
|
|
});
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
log.error("立即执行执行失败", e);
|
|
|
+ } finally {
|
|
|
+ // 2. 清理租户上下文
|
|
|
+ if (isSwitched) {
|
|
|
+ try {
|
|
|
+ TenantConfigContext.clear();
|
|
|
+ SecurityContextHolder.clearContext();
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("clear");
|
|
|
+ method.invoke(manager);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("SOP异步任务清理租户数据源失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -167,28 +215,68 @@ public class AsyncSopTestService {
|
|
|
/**
|
|
|
* 判断类型
|
|
|
*/
|
|
|
- private void processRuleTimeVOInternal(QwSopRuleTimeVO ruleTimeVO, QwSopTemp qwSopTemp,SimpleDateFormat sdf) {
|
|
|
- QwSop qwSop = createQwSop(ruleTimeVO, 2L);
|
|
|
-
|
|
|
- switch (ruleTimeVO.getType()) {
|
|
|
- case 1:
|
|
|
- processWeChatSop(ruleTimeVO, qwSop,sdf);
|
|
|
- break;
|
|
|
- case 2:
|
|
|
- processEnterpriseWeChatSop(ruleTimeVO, qwSop,sdf);
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
+ private void processRuleTimeVOInternal(QwSopRuleTimeVO ruleTimeVO, QwSopTemp qwSopTemp,SimpleDateFormat sdf, Long tenantId) {
|
|
|
+ boolean isSwitched = false;
|
|
|
+ try {
|
|
|
+ if (tenantId != null) {
|
|
|
+ try {
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("ensureSwitchByTenantId", Long.class);
|
|
|
+ method.invoke(manager, tenantId);
|
|
|
+ isSwitched = true;
|
|
|
+
|
|
|
+ // 加载租户 projectConfig
|
|
|
+ SysConfig cfg = sysConfigMapper.selectConfigByConfigKey("projectConfig");
|
|
|
+ if (cfg != null && StringUtils.isNotBlank(cfg.getConfigValue())) {
|
|
|
+ TenantConfigContext.set(JSON.parseObject(cfg.getConfigValue()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 设置租户到 SecurityContext
|
|
|
+ SecurityContextHolder.getContext().setAuthentication(
|
|
|
+ new UsernamePasswordAuthenticationToken(
|
|
|
+ new TenantPrincipal(tenantId),
|
|
|
+ null,
|
|
|
+ Collections.emptyList()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("processRuleTimeVOInternal 切换租户数据源失败: tenantId={}", tenantId, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- qwSopMapper.updateQwSop(qwSop);
|
|
|
+ QwSop qwSop = createQwSop(ruleTimeVO, 2L);
|
|
|
+ switch (ruleTimeVO.getType()) {
|
|
|
+ case 1:
|
|
|
+ processWeChatSop(ruleTimeVO, qwSop, sdf, tenantId);
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ processEnterpriseWeChatSop(ruleTimeVO, qwSop, sdf, tenantId);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ qwSopMapper.updateQwSop(qwSop);
|
|
|
+ } finally {
|
|
|
+ if (isSwitched) {
|
|
|
+ try {
|
|
|
+ TenantConfigContext.clear();
|
|
|
+ SecurityContextHolder.clearContext();
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("clear");
|
|
|
+ method.invoke(manager);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("processRuleTimeVOInternal 清理租户数据源失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 个微
|
|
|
*/
|
|
|
- private void processWeChatSop(QwSopRuleTimeVO ruleTimeVO, QwSop qwSop,SimpleDateFormat sdf) {
|
|
|
+ private void processWeChatSop(QwSopRuleTimeVO ruleTimeVO, QwSop qwSop,SimpleDateFormat sdf, Long tenantId) {
|
|
|
// CompanyWxUserSopParam wxUserSopParam = createCompanyWxUserSopParam(ruleTimeVO);
|
|
|
// List<CompanyWxUser> companyWxUserList = companyWxUserMapper.selectCompanyWxUserByCompanyUserId(wxUserSopParam);
|
|
|
//
|
|
|
@@ -214,7 +302,7 @@ public class AsyncSopTestService {
|
|
|
/**
|
|
|
* 企微
|
|
|
*/
|
|
|
- private void processEnterpriseWeChatSop(QwSopRuleTimeVO ruleTimeVO, QwSop qwSop,SimpleDateFormat sdf) {
|
|
|
+ private void processEnterpriseWeChatSop(QwSopRuleTimeVO ruleTimeVO, QwSop qwSop,SimpleDateFormat sdf, Long tenantId) {
|
|
|
QwSopTagsParam qwSopTagsParam = createQwSopTagsParam(ruleTimeVO);
|
|
|
List<QwFilterSopCustomersResult> qwFilterSopCustomersResults = qwSopMapper.selectFilterQwSopCustomers(qwSopTagsParam);
|
|
|
|
|
|
@@ -225,7 +313,7 @@ public class AsyncSopTestService {
|
|
|
}
|
|
|
|
|
|
//如果是新客对话类型的,则跳过
|
|
|
- if (ruleTimeVO.getSendType()==4){
|
|
|
+ if (ruleTimeVO.getSendType() == 4) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -245,8 +333,8 @@ public class AsyncSopTestService {
|
|
|
rocketMQTemplate.syncSend("voice-generation", JSON.toJSONString(VoiceVo.builder().type(1).id(ruleTimeVO.getId()).build()));
|
|
|
// new Thread(() -> HttpUtils.sendGet("http://118.24.209.192:8009/qw/voice/synchronousSop", "sopId=" + ruleTimeVO.getId())).start();
|
|
|
// qwSopTempVoiceService.synchronous(ruleTimeVO.getId(), companyUserIds);
|
|
|
- }catch (Exception e){
|
|
|
- log.error("异步同步临时语音失败:",e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("异步同步临时语音失败:", e);
|
|
|
}
|
|
|
groupedResults.forEach((userIdAndQwUserId, externalUserIds) -> {
|
|
|
String[] keys = userIdAndQwUserId.split("\\|");
|
|
|
@@ -255,8 +343,8 @@ public class AsyncSopTestService {
|
|
|
String companyUserId = keys[2].trim();
|
|
|
String companyId = keys[3].trim();
|
|
|
|
|
|
- SopUserLogs sopUserLogs = createSopUserLogs(ruleTimeVO, userId, qwUserId, companyUserId, companyId,sdf);
|
|
|
- SopUserLogsList userLogsList = createSopUserLogsList(ruleTimeVO, userId, qwUserId, companyUserId, companyId,sdf);
|
|
|
+ SopUserLogs sopUserLogs = createSopUserLogs(ruleTimeVO, userId, qwUserId, companyUserId, companyId, sdf);
|
|
|
+ SopUserLogsList userLogsList = createSopUserLogsList(ruleTimeVO, userId, qwUserId, companyUserId, companyId, sdf);
|
|
|
|
|
|
String unionSopId = sopUserLogsService.selectSopUserLogsByUnionSopId(sopUserLogs);
|
|
|
if (!StringUtil.strIsNullOrEmpty(unionSopId)) {
|