|
|
@@ -46,6 +46,7 @@ import com.fs.wx.sop.mapper.WxSopUserMapper;
|
|
|
import com.fs.wx.sop.vo.WxSopUserMsgGenVO;
|
|
|
import com.fs.wxcid.dto.friend.AddContactParam;
|
|
|
import com.fs.wxcid.service.FriendService;
|
|
|
+import com.fs.wxcid.utils.TenantHelper;
|
|
|
import com.fs.wxcid.vo.AddContactVo;
|
|
|
import com.fs.wxwork.dto.WxAddSearchDTO;
|
|
|
import com.fs.wxwork.dto.WxSearchContactDTO;
|
|
|
@@ -131,6 +132,12 @@ public class WxTaskService {
|
|
|
private final QwExternalContactMapper qwExternalContactMapper;
|
|
|
private final CompanyAiWorkflowExecLogMapper companyAiWorkflowExecLogMapper;
|
|
|
|
|
|
+ /** 加微延时扫描分布式锁 key 前缀,按 租户id:分组 隔离,避免多实例/重入重复扫描同批延时key */
|
|
|
+ private static final String ADD_WX_DELAY_LOCK_PREFIX = "cid_workflow:addwx_delay_lock:";
|
|
|
+
|
|
|
+ /** 企微加微延时扫描分布式锁 key 前缀,按 租户id:分组 隔离 */
|
|
|
+ private static final String QW_ADD_WX_DELAY_LOCK_PREFIX = "cid_workflow:qwaddwx_delay_lock:";
|
|
|
+
|
|
|
public void addWx(List<Long> accountIdList) {
|
|
|
log.info("==========执行加微信任务开始==========");
|
|
|
String json = sysConfigService.selectConfigByKey("wx.config");
|
|
|
@@ -851,53 +858,69 @@ public class WxTaskService {
|
|
|
* 扫描工作流延时任务
|
|
|
*/
|
|
|
public void cidWorkflowAddWxRun() {
|
|
|
- log.info("===========工作流延时任务开始扫描===========");
|
|
|
-// String delayAddWxKeyPrefix = AiAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
|
|
|
-//// Set<String> keys = redisKeyScanner.scanMatchKey(delayAddWxKeyPrefix);
|
|
|
-// Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
|
|
|
- // 扫描新加微节点的延时Key
|
|
|
- String delayAddWxNewKeyPrefix = AiAddWxTaskNewNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
|
|
|
- Collection<String> keys = redisCache2.keys(delayAddWxNewKeyPrefix);
|
|
|
- log.info("cidWorkflowAddWxRun共扫描到 {} 个待处理键", keys.size());
|
|
|
- // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
|
|
|
- Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
|
|
|
- keys.parallelStream().forEach(key -> {
|
|
|
- try {
|
|
|
- //doExec
|
|
|
- CompletableFuture.runAsync(()->{
|
|
|
- try {
|
|
|
- ExecutionContext context = redisCache2.getCacheObject(key);
|
|
|
- if (context == null) {
|
|
|
- log.warn("工作流延时任务context为空,跳过 - key: {}", key);
|
|
|
- redisCache2.deleteObject(key);
|
|
|
- return;
|
|
|
- }
|
|
|
- // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
|
|
|
- Long taskId = context.getVariable("roboticId", Long.class);
|
|
|
- if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
|
|
|
- // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
|
|
|
- // 同步context信息到DB exec,供恢复时resumePausedInstances使用
|
|
|
- context.setVariable("callSource", "addWxTimer");
|
|
|
- context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
|
|
|
- companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
|
|
|
- log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
|
|
|
+ // 分布式锁:按 租户id:分组 隔离,拿不到锁说明已有实例在扫描,直接跳过本轮(延时key下轮仍可扫到,不丢失)
|
|
|
+ String lockKey = ADD_WX_DELAY_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
|
|
|
+ RLock methodLock = redissonClient.getLock(lockKey);
|
|
|
+ boolean locked = false;
|
|
|
+ try {
|
|
|
+ // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
|
|
|
+ locked = methodLock.tryLock(0, -1, TimeUnit.SECONDS);
|
|
|
+ if (!locked) {
|
|
|
+ log.info("cidWorkflowAddWxRun 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("===========工作流延时任务开始扫描===========");
|
|
|
+ // 扫描新加微节点的延时Key
|
|
|
+ String delayAddWxNewKeyPrefix = AiAddWxTaskNewNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
|
|
|
+ Collection<String> keys = redisCache2.keys(delayAddWxNewKeyPrefix);
|
|
|
+ log.info("cidWorkflowAddWxRun共扫描到 {} 个待处理键", keys.size());
|
|
|
+ // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
|
|
|
+ Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
|
|
|
+ keys.parallelStream().forEach(key -> {
|
|
|
+ try {
|
|
|
+ //doExec
|
|
|
+ CompletableFuture.runAsync(()->{
|
|
|
+ try {
|
|
|
+ ExecutionContext context = redisCache2.getCacheObject(key);
|
|
|
+ if (context == null) {
|
|
|
+ log.warn("工作流延时任务context为空,跳过 - key: {}", key);
|
|
|
+ redisCache2.deleteObject(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
|
|
|
+ Long taskId = context.getVariable("roboticId", Long.class);
|
|
|
+ if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
|
|
|
+ // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
|
|
|
+ // 同步context信息到DB exec,供恢复时resumePausedInstances使用
|
|
|
+ context.setVariable("callSource", "addWxTimer");
|
|
|
+ context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
|
|
|
+ companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
|
|
|
+ log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
|
|
|
+ redisCache2.deleteObject(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ context.setVariable("callRedisKey",key);
|
|
|
+ context.setVariable("callSource","addWxTimer");
|
|
|
+ companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
|
|
|
redisCache2.deleteObject(key);
|
|
|
- return;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理工作流延时任务异常 - key: {}", key, e);
|
|
|
}
|
|
|
- context.setVariable("callRedisKey",key);
|
|
|
- context.setVariable("callSource","addWxTimer");
|
|
|
- companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
|
|
|
- redisCache2.deleteObject(key);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("处理工作流延时任务异常 - key: {}", key, e);
|
|
|
- }
|
|
|
- }, cidWorkFlowExecutor);
|
|
|
+ }, cidWorkFlowExecutor);
|
|
|
|
|
|
- } catch (Exception ex) {
|
|
|
- log.error("处理工作流延时任务异常 - key: {}", key, ex);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("处理工作流延时任务异常 - key: {}", key, ex);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ log.info("===========工作流延时任务扫描结束===========");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("cidWorkflowAddWxRun 获取分布式锁被中断 - lockKey: {}", lockKey, e);
|
|
|
+ } finally {
|
|
|
+ if (locked && methodLock.isHeldByCurrentThread()) {
|
|
|
+ methodLock.unlock();
|
|
|
}
|
|
|
- });
|
|
|
- log.info("===========工作流延时任务扫描结束===========");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -1677,49 +1700,68 @@ public class WxTaskService {
|
|
|
* 扫描企微加微工作流延时任务
|
|
|
*/
|
|
|
public void cidWorkflowQwAddWxRun() {
|
|
|
- log.info("===========企微加微工作流延时任务开始扫描===========");
|
|
|
- String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
|
|
|
- Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
|
|
|
- log.info("企微加微共扫描到 {} 个待处理键", keys.size());
|
|
|
- // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
|
|
|
- Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
|
|
|
- keys.parallelStream().forEach(key -> {
|
|
|
- try {
|
|
|
- //doExec
|
|
|
- CompletableFuture.runAsync(()->{
|
|
|
- try {
|
|
|
- ExecutionContext context = redisCache2.getCacheObject(key);
|
|
|
- if (context == null) {
|
|
|
- log.warn("企微加微工作流延时任务context为空,跳过 - key: {}", key);
|
|
|
- redisCache2.deleteObject(key);
|
|
|
- return;
|
|
|
- }
|
|
|
- // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
|
|
|
- Long taskId = context.getVariable("roboticId", Long.class);
|
|
|
- if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
|
|
|
- // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
|
|
|
- // 同步context信息到DB exec,供恢复时resumePausedInstances使用
|
|
|
- context.setVariable("callSource", "qwAddWxTimer");
|
|
|
- context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
|
|
|
- companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
|
|
|
- log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
|
|
|
+ // 分布式锁:按 租户id:分组 隔离,拿不到锁说明已有实例在扫描,直接跳过本轮(延时key下轮仍可扫到,不丢失)
|
|
|
+ String lockKey = QW_ADD_WX_DELAY_LOCK_PREFIX + TenantHelper.getTenantId() + ":" + cidGroupNo;
|
|
|
+ RLock methodLock = redissonClient.getLock(lockKey);
|
|
|
+ boolean locked = false;
|
|
|
+ try {
|
|
|
+ // waitTime=0 拿不到立即返回;leaseTime=-1 启用看门狗自动续期,避免大数据量执行超时被提前释放
|
|
|
+ locked = methodLock.tryLock(0, -1, TimeUnit.SECONDS);
|
|
|
+ if (!locked) {
|
|
|
+ log.info("cidWorkflowQwAddWxRun 已有实例在执行,跳过本轮 - lockKey: {}", lockKey);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("===========企微加微工作流延时任务开始扫描===========");
|
|
|
+ String delayAddWxKeyPrefix = AiQwAddWxTaskNode.getDelayAddWxKeyPrefix(cidGroupNo,null) + "*";
|
|
|
+ Collection<String> keys = redisCache2.keys(delayAddWxKeyPrefix);
|
|
|
+ log.info("企微加微共扫描到 {} 个待处理键", keys.size());
|
|
|
+ // 本地缓存已查询的任务暂停状态,避免同一批次重复查询
|
|
|
+ Map<Long, Boolean> pausedCache = new ConcurrentHashMap<>();
|
|
|
+ keys.parallelStream().forEach(key -> {
|
|
|
+ try {
|
|
|
+ //doExec
|
|
|
+ CompletableFuture.runAsync(()->{
|
|
|
+ try {
|
|
|
+ ExecutionContext context = redisCache2.getCacheObject(key);
|
|
|
+ if (context == null) {
|
|
|
+ log.warn("企微加微工作流延时任务context为空,跳过 - key: {}", key);
|
|
|
+ redisCache2.deleteObject(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 任务暂停守卫检查(roboticId即CompanyVoiceRobotic.id,是实际暂停操作的目标)
|
|
|
+ Long taskId = context.getVariable("roboticId", Long.class);
|
|
|
+ if (taskId != null && pausedCache.computeIfAbsent(taskId, id -> companyVoiceRoboticService.isTaskPaused(id))) {
|
|
|
+ // 延时key是时间分片前缀,下一分钟就不会再扫到,直接删除
|
|
|
+ // 同步context信息到DB exec,供恢复时resumePausedInstances使用
|
|
|
+ context.setVariable("callSource", "qwAddWxTimer");
|
|
|
+ context.setVariable("_delayTargetNodeKey", context.getCurrentNodeKey());
|
|
|
+ companyWorkflowEngine.updateExecVariables(context.getWorkflowInstanceId(), context.getVariables());
|
|
|
+ log.info("任务已暂停,删除延时key并同步exec,等待恢复时从DB重建 - taskId: {}, key: {}", taskId, key);
|
|
|
+ redisCache2.deleteObject(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ context.setVariable("callRedisKey",key);
|
|
|
+ context.setVariable("callSource","qwAddWxTimer");
|
|
|
+ companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
|
|
|
redisCache2.deleteObject(key);
|
|
|
- return;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理工作流延时任务异常 - key: {}", key, e);
|
|
|
}
|
|
|
- context.setVariable("callRedisKey",key);
|
|
|
- context.setVariable("callSource","qwAddWxTimer");
|
|
|
- companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
|
|
|
- redisCache2.deleteObject(key);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("处理工作流延时任务异常 - key: {}", key, e);
|
|
|
- }
|
|
|
- }, cidWorkFlowExecutor);
|
|
|
+ }, cidWorkFlowExecutor);
|
|
|
|
|
|
- } catch (Exception ex) {
|
|
|
- log.error("处理工作流延时任务异常 - key: {}", key, ex);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("处理工作流延时任务异常 - key: {}", key, ex);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ log.info("===========工作流延时任务扫描结束===========");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.warn("cidWorkflowQwAddWxRun 获取分布式锁被中断 - lockKey: {}", lockKey, e);
|
|
|
+ } finally {
|
|
|
+ if (locked && methodLock.isHeldByCurrentThread()) {
|
|
|
+ methodLock.unlock();
|
|
|
}
|
|
|
- });
|
|
|
- log.info("===========工作流延时任务扫描结束===========");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|