lmx 6 päivää sitten
vanhempi
commit
e8f417a1a8

+ 6 - 0
fs-service/src/main/java/com/fs/company/service/CompanyWorkflowEngine.java

@@ -1,5 +1,6 @@
 package com.fs.company.service;
 
+import com.alibaba.fastjson.JSONObject;
 import com.fs.company.domain.CompanyAiWorkflowExecLog;
 import com.fs.company.vo.ExecutionResult;
 
@@ -69,4 +70,9 @@ public interface CompanyWorkflowEngine {
      * @param workFlowId
      */
     Long createSipTask(Long roboticId,Long workFlowId);
+    /**
+     * 添加微信成功
+     * @param object
+     */
+    void addWxSuccess(JSONObject object);
 }

+ 5 - 5
fs-service/src/main/java/com/fs/company/service/impl/CompanyVoiceRoboticServiceImpl.java

@@ -846,11 +846,11 @@ public class CompanyVoiceRoboticServiceImpl extends ServiceImpl<CompanyVoiceRobo
     @Override
     @Async("cidWorkFlowExecutor")
     public void callerResult4EasyCall(CdrDetailVo result) {
-//        try {
-//            Thread.sleep(3000L);
-//        } catch (InterruptedException e) {
-//            throw new RuntimeException(e);
-//        }
+        try {
+            Thread.sleep(5000L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
 //        EASYCALL
         log.info("进入easyCall外呼结果回调:{}", JSON.toJSONString(result));
         if (result == null || StringUtils.isBlank(result.getUuid())) return;

+ 7 - 5
fs-service/src/main/java/com/fs/company/service/impl/CompanyWorkflowEngineImpl.java

@@ -619,13 +619,15 @@ public class CompanyWorkflowEngineImpl implements CompanyWorkflowEngine {
 
     /**
      * 加微成功后流程唤醒操作
-     * @param workflowInstanceId
-     * @param nodeKey
-     * @param accountId
-     * @param remark
+     * @param object
      */
     @Async("cidWorkFlowExecutor")
-    public void addWxSuccess(String workflowInstanceId, String nodeKey,Long accountId,String remark){
+    public void addWxSuccess(JSONObject object){
+
+        String workflowInstanceId = object.getString("instanceId");
+        String nodeKey = object.getString("nodeKey");
+        Long accountId = object.getLong("accountId");
+        String remark = object.getString("remark");
         if(StringUtils.isBlank(remark) || StringUtils.isBlank(workflowInstanceId) || StringUtils.isBlank(nodeKey) || accountId == null){
             log.error("addWxSuccess: 参数错误,workflowInstanceId:{},nodeKey:{},accountId:{},remark:{}", workflowInstanceId, nodeKey, accountId, remark);
             return;

+ 15 - 15
fs-service/src/main/java/com/fs/company/service/impl/CompanyWxServiceImpl.java

@@ -587,19 +587,19 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
     public void triggerWorkflowOnAddWxSuccess(Long wxClientId) {
         try {
             // 先查老类型的等待中工作流实例
-            CompanyAiWorkflowExec waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
-                    wxClientId,
-                    ExecutionStatusEnum.WAITING.getValue(),
-                    NodeTypeEnum.AI_ADD_WX_TASK.getValue());
-            boolean isNewNodeType = false;
-            // 老类型未找到,再查新类型
-            if (waitingExec == null) {
-                waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
+//            CompanyAiWorkflowExec waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
+//                    wxClientId,
+//                    ExecutionStatusEnum.WAITING.getValue(),
+//                    NodeTypeEnum.AI_ADD_WX_TASK.getValue());
+//            boolean isNewNodeType = false;
+//            // 老类型未找到,再查新类型
+//            if (waitingExec == null) {
+            CompanyAiWorkflowExec  waitingExec = companyAiWorkflowExecMapper.selectWaitingAddWxWorkflowByWxClientId(
                         wxClientId,
                         ExecutionStatusEnum.WAITING.getValue(),
                         NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue());
-                isNewNodeType = true;
-            }
+//                isNewNodeType = true;
+//            }
 
             if (waitingExec == null) {
                 log.info("未找到等待中的加微工作流实例 - wxClientId: {}", wxClientId);
@@ -609,7 +609,7 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
             //查询工作流加微执行日志是否未更新状态
             CompanyAiWorkflowExecLog queryP = new CompanyAiWorkflowExecLog();
             queryP.setWorkflowInstanceId(waitingExec.getWorkflowInstanceId());
-            queryP.setNodeType(isNewNodeType ? NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue() : NodeTypeEnum.AI_ADD_WX_TASK.getValue());
+            queryP.setNodeType(NodeTypeEnum.AI_ADD_WX_TASK_NEW.getValue());
             queryP.setStatus(ExecutionStatusEnum.WAITING.getValue());
             List<CompanyAiWorkflowExecLog> companyAiWorkflowExecLogs = companyAiWorkflowExecLogMapper.selectCompanyAiWorkflowExecLogList(queryP);
             companyAiWorkflowExecLogs.forEach(log -> {
@@ -626,11 +626,11 @@ public class CompanyWxServiceImpl extends ServiceImpl<CompanyWxAccountMapper, Co
 
             // 互斥检查:根据节点类型使用对应的互斥方法
             boolean canExecute;
-            if (isNewNodeType) {
+//            if (isNewNodeType) {
                 canExecute = AiAddWxTaskNewNode.tryMarkAsExecuted(workflowInstanceId, wxClientId);
-            } else {
-                canExecute = AiAddWxTaskNode.tryMarkAsExecuted(workflowInstanceId, wxClientId);
-            }
+//            } else {
+//                canExecute = AiAddWxTaskNode.tryMarkAsExecuted(workflowInstanceId, wxClientId);
+//            }
             if (!canExecute) {
                 log.info("工作流已被其他路径执行,跳过 - workflowInstanceId: {}, wxClientId: {}",
                         workflowInstanceId, wxClientId);

+ 15 - 2
fs-service/src/main/java/com/fs/company/service/impl/call/node/AbstractWorkflowNode.java

@@ -205,7 +205,8 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             fExecLog.setStatus(status.getValue());
             fExecLog.setEndTime(new Date());
             long durationInMillis = fExecLog.getEndTime().getTime() - fExecLog.getStartTime().getTime();
-            fExecLog.setDuration(durationInMillis);
+            // 兜底防护:确保duration不为负数;当计算结果<=0时设为1毫秒以显得更加真实
+            fExecLog.setDuration(durationInMillis > 0 ? durationInMillis : 1);
             companyAiWorkflowExecLogMapper.updateById(fExecLog);
         }else{
             log.error("未更新到节点状态:context:{},findS:{},targetS:{}",context,findStatus,status);
@@ -364,14 +365,21 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             logEntry.setErrorMessage(result.getErrorMessage());
             logEntry.setCreatedTime(new Date());
             Long startTime = context.getVariable("node_start_time_" + nodeKey, Long.class);
+            Long endTime = context.getVariable("node_end_time_" + nodeKey, Long.class);
+            // 当startTime为null但endTime不为null时(如获取锁失败、preExecute未执行),
+            // 不能用new Date()作为startTime的fallback,因为此时new Date()可能晚于endTime,
+            // 导致duration计算为负数。应将startTime对齐到endTime,表示节点未能正常启动
             if (null != startTime) {
                 logEntry.setStartTime(new Date(startTime));
+            } else if (null != endTime) {
+                logEntry.setStartTime(new Date(endTime));
             } else {
                 logEntry.setStartTime(new Date());
             }
-            Long endTime = context.getVariable("node_end_time_" + nodeKey, Long.class);
             if (null != endTime) {
                 logEntry.setEndTime(new Date(endTime));
+            } else if (null != startTime) {
+                logEntry.setEndTime(new Date(startTime));
             } else {
                 logEntry.setEndTime(new Date());
             }
@@ -379,6 +387,11 @@ public abstract class AbstractWorkflowNode implements IWorkflowNode {
             if (null != startTime && null != endTime) {
                 duration = endTime - startTime;
             }
+            // 兜底防护:确保duration不为负数;当使用fallback值(startTime或endTime为null)时,
+            // 最小耗时设为1毫秒以显得更加真实
+            if (duration <= 0) {
+                duration = (null != startTime && null != endTime) ? 0 : 1;
+            }
             logEntry.setDuration(duration);
             return logEntry;
         } catch (JsonProcessingException e) {

+ 7 - 1
fs-service/src/main/java/com/fs/company/service/impl/call/node/AiAddWxTaskNewNode.java

@@ -1,5 +1,6 @@
 package com.fs.company.service.impl.call.node;
 
+import cn.hutool.core.util.RandomUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.common.constant.Constants;
 import com.fs.common.core.redis.RedisCacheT;
@@ -94,6 +95,7 @@ public class AiAddWxTaskNewNode extends AbstractWorkflowNode {
             }
 
             WxContact wxQuery = companyAiWorkflowExecMapper.selectWxContectByWorkflowInstanceId(context.getWorkflowInstanceId());
+            wxQuery.setRemark(wxQuery.getRemark() + RandomUtil.randomNumbers(10));
             wxQuery.setNickName(wxQuery.getRemark());
             wxQuery.setFriends(0);
             wxContactMapper.insert(wxQuery);
@@ -313,6 +315,10 @@ public class AiAddWxTaskNewNode extends AbstractWorkflowNode {
     @Override
     protected void postExecute(ExecutionContext context, ExecutionResult result) {
         super.postExecute(context, result);
-        doneAddwx(context.getWorkflowInstanceId());
+        // 仅当节点成功进入PAUSED状态时才执行doneAddwx,
+        // 避免在result为null(执行异常)或FAILURE(加微准备失败)时仍继续流转流程
+        if (result != null && result.getStatus() == ExecutionStatusEnum.PAUSED) {
+            doneAddwx(context.getWorkflowInstanceId());
+        }
     }
 }

+ 10 - 0
fs-wx-api/src/main/java/com/fs/app/websocket/service/WebSocketServer.java

@@ -8,6 +8,7 @@ import com.fs.app.websocket.bean.ResultMsgVo;
 import com.fs.app.websocket.bean.SendMsgVo;
 import com.fs.company.domain.CompanyWxClient;
 import com.fs.company.mapper.CompanyWxClientMapper;
+import com.fs.company.service.CompanyWorkflowEngine;
 import com.fs.company.service.impl.CompanyWxServiceImpl;
 import com.fs.wxcid.domain.CidIpadServer;
 import com.fs.wxcid.mapper.CidIpadServerMapper;
@@ -22,6 +23,7 @@ import com.fs.wxcid.mapper.WxContactMapper;
 import com.fs.wxcid.service.IWxMsgLogService;
 import com.hc.openapi.tool.fastjson.JSON;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
@@ -47,6 +49,7 @@ public class WebSocketServer {
     CompanyWxClientMapper companyWxClientMapper = SpringUtils.getBean(CompanyWxClientMapper.class);
     CompanyWxServiceImpl companyWxService = SpringUtils.getBean(CompanyWxServiceImpl.class);
     CidIpadServerMapper cidIpadServerMapper = SpringUtils.getBean(CidIpadServerMapper.class);
+    CompanyWorkflowEngine companyWorkflowEngine = SpringUtils.getBean(CompanyWorkflowEngine.class);
 
     //发送消息
     public <T> void sendMessage(Session session, ResultMsgVo<T> data) {
@@ -161,9 +164,11 @@ public class WebSocketServer {
                     com.fs.wxcid.vo.wxvo.AddResultWxVo addResultWxVo = JSON.parseObject(msg.getDataJson(), com.fs.wxcid.vo.wxvo.AddResultWxVo.class);
                     log.info("接收到加好友回调:{}", addResultWxVo);
                     WxContact wxContact = wxContactMapper.selectOne(new QueryWrapper<WxContact>().eq("remark", addResultWxVo.getRemark()).eq("friends", 0));
+                    log.info("更新联系人:{}", wxContact);
                     wxContact.setFriends(1);
                     wxContactMapper.updateById(wxContact);
                     List<CompanyWxClient> clients = companyWxClientMapper.selectWxV2(companyWxAccount.getId(), wxContact.getPhone());
+                    log.info("更新联系人2:{}", clients);
                     if(clients != null){
                         clients.parallelStream().forEach(e -> {
                             e.setIsAdd(1);
@@ -174,6 +179,11 @@ public class WebSocketServer {
                             companyWxService.triggerWorkflowOnAddWxSuccess(e.getId());
                         });
                     }
+//                    if(null != addResultWxVo && StringUtils.isNotBlank(addResultWxVo.getBizJson())){
+//                        JSONObject jsonObject = JSONObject.parseObject(addResultWxVo.getBizJson());
+//                        jsonObject.put("remark",addResultWxVo.getRemark());
+//                        companyWorkflowEngine.addWxSuccess(jsonObject);
+//                    }
                     break;
 
             }

+ 4 - 4
fs-wx-task/src/main/java/com/fs/app/task/WxTask.java

@@ -24,10 +24,10 @@ public class WxTask {
 //    public void addWx() {
 //        taskService.addWx(null);
 //    }
-    @Scheduled(cron = "0 0/1 * * * ?")
-    public void addWx4Workflow() {
-        taskService.addWx4Workflow(null);
-    }
+//    @Scheduled(cron = "0 0/1 * * * ?")
+//    public void addWx4Workflow() {
+//        taskService.addWx4Workflow(null);
+//    }
     @Scheduled(cron = "0 0 0 * * ?")
     public void initAccountNum() {
         taskService.initAccountNum();