|
|
@@ -13,6 +13,7 @@ import com.fs.common.utils.PubFun;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
import com.fs.company.domain.*;
|
|
|
import com.fs.company.mapper.*;
|
|
|
+import com.fs.company.param.ExecutionContext;
|
|
|
import com.fs.company.service.ICompanyVoiceRoboticService;
|
|
|
import com.fs.company.service.ICompanyWxAccountService;
|
|
|
import com.fs.company.service.ICompanyWxClientService;
|
|
|
@@ -20,6 +21,8 @@ import com.fs.company.service.ICompanyWxDialogService;
|
|
|
import com.fs.company.service.impl.*;
|
|
|
import com.fs.company.service.CompanyWorkflowEngine;
|
|
|
import com.fs.company.service.impl.call.node.AiAddWxTaskNode;
|
|
|
+import com.fs.company.service.impl.call.node.AiCallTaskNode;
|
|
|
+import com.fs.course.config.RedisKeyScanner;
|
|
|
import com.fs.enums.ExecutionStatusEnum;
|
|
|
import com.fs.enums.NodeTypeEnum;
|
|
|
import com.fs.company.util.ObjectPlaceholderResolver;
|
|
|
@@ -81,6 +84,7 @@ public class WxTaskService {
|
|
|
r -> new Thread(r, "callPool-" + System.currentTimeMillis()),
|
|
|
new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
);
|
|
|
+ private final RedisKeyScanner redisKeyScanner;
|
|
|
|
|
|
public void addWx(List<Long> accountIdList) {
|
|
|
log.info("==========执行加微信任务开始==========");
|
|
|
@@ -97,7 +101,7 @@ public class WxTaskService {
|
|
|
.map(callee -> callee.getRoboticId() + "_" + callee.getUserId())
|
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
- list = list.stream()
|
|
|
+ list = list.stream()
|
|
|
.filter(client -> {
|
|
|
String key = client.getRoboticId() + "_" + client.getCustomerId();
|
|
|
return !existingKeys.contains(key);
|
|
|
@@ -105,7 +109,7 @@ public class WxTaskService {
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
log.info("需要添加微信的数量:{}", list.size());
|
|
|
- if(list.isEmpty()) return;
|
|
|
+ if (list.isEmpty()) return;
|
|
|
List<CompanyWxClient> addList = new ArrayList<>();
|
|
|
Map<Long, CompanyWxClient> clientMap = PubFun.listToMapByGroupObject(list, CompanyWxClient::getAccountId);
|
|
|
List<CompanyWxAccount> accountList = new ArrayList<>(companyWxAccountService.listByIds(clientMap.keySet()));
|
|
|
@@ -128,23 +132,23 @@ public class WxTaskService {
|
|
|
log.info("实际加微的账号数量:{}", addAccountList.size());
|
|
|
addAccountList.forEach(e -> {
|
|
|
CompanyWxClient client = clientMap.get(e.getId());
|
|
|
- if(client != null){
|
|
|
+ if (client != null) {
|
|
|
String task = redisCache.getCacheObject(Constants.TASK_ID + client.getRoboticId());
|
|
|
log.info("ROBOTIC-ID:{},CLIENT-ID:{},当前任务执行状态:{}", client.getRoboticId(), client.getId(), task);
|
|
|
- if(StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)){
|
|
|
+ if (StringUtils.isNotEmpty(task) && Constants.ADD_WX.equals(task)) {
|
|
|
CompanyWxDialog dialog = companyWxDialogService.getById(client.getDialogId());
|
|
|
CrmCustomer crmCustomer = crmCustomerService.selectCrmCustomerById(client.getCustomerId());
|
|
|
String newTxt = objectPlaceholderResolver.resolvePlaceholders(crmCustomer, dialog.getTemplateDetails());
|
|
|
AddContactVo vo = friendService.addContact(e.getId(), crmCustomer.getMobile(), newTxt, client.getId());
|
|
|
JSONObject runParam = new JSONObject();
|
|
|
- runParam.put("id",e.getId());
|
|
|
- runParam.put("mobile",crmCustomer.getMobile());
|
|
|
- runParam.put("txt",newTxt);
|
|
|
- runParam.put("clientId",client.getId());
|
|
|
+ runParam.put("id", e.getId());
|
|
|
+ runParam.put("mobile", crmCustomer.getMobile());
|
|
|
+ runParam.put("txt", newTxt);
|
|
|
+ runParam.put("clientId", client.getId());
|
|
|
CompanyVoiceRoboticCallLogAddwx addLog = CompanyVoiceRoboticCallLogAddwx.initCallLog(
|
|
|
- runParam.toJSONString(),client.getId(),client.getRoboticId(),e.getId(),e.getCompanyId());
|
|
|
+ runParam.toJSONString(), client.getId(), client.getRoboticId(), e.getId(), e.getCompanyId());
|
|
|
log.info("ROBOTIC-ID:{},CLIENT-ID:{},执行加微:{},客户:{}-{},使用话术:{}", client.getRoboticId(), client.getId(), e.getId(), client.getCustomerId(), crmCustomer.getCustomerName(), dialog.getName());
|
|
|
- if(vo.isSuccess()){
|
|
|
+ if (vo.isSuccess()) {
|
|
|
e.setLastAddWxTime(LocalDateTime.now());
|
|
|
e.setIsAddNum(e.getIsAddNum() + 1);
|
|
|
client.setIsAdd(2);
|
|
|
@@ -154,20 +158,20 @@ public class WxTaskService {
|
|
|
addList.add(client);
|
|
|
addLog.setStatus(2);
|
|
|
addLog.setResult(JSON.toJSONString(vo));
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
log.error("ROBOTIC-ID:{},加微失败:{}", client.getRoboticId(), vo);
|
|
|
addLog.setStatus(3);
|
|
|
addLog.setResult(JSON.toJSONString(vo));
|
|
|
}
|
|
|
asyncSaveCompanyVoiceRoboticCallLog(addLog);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
log.error("ROBOTIC-ID:{},当前任务没有执行加微任务", client.getRoboticId());
|
|
|
}
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
log.error("当前账号暂无需要添加微信:{}-{}", e.getId(), e.getWxNickName());
|
|
|
}
|
|
|
});
|
|
|
- if(!addList.isEmpty()){
|
|
|
+ if (!addList.isEmpty()) {
|
|
|
companyWxClientService.updateBatchById(addList);
|
|
|
//根据加微成功的用户,判定是否加入延时执行下一步任务
|
|
|
Set<Long> roboticIdSet = addList.stream().map(CompanyWxClient::getRoboticId).collect(Collectors.toSet());
|
|
|
@@ -177,27 +181,27 @@ public class WxTaskService {
|
|
|
List<CompanyVoiceRobotic> companyVoiceRobotics = companyVoiceRoboticMapper.selectBatchIds(roboticIdSet);
|
|
|
Map<Long, CompanyVoiceRobotic> roboticsMp = companyVoiceRobotics.stream().collect(Collectors.toMap(CompanyVoiceRobotic::getId, Function.identity(), (existing, replacement) -> existing));
|
|
|
//找到callees数据
|
|
|
- List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet,roboticIdSet);
|
|
|
- Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e->e.getUserId()+ "-" +e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
|
|
|
+ List<CompanyVoiceRoboticCallees> companyVoiceRoboticCallees = companyVoiceRoboticCalleesMapper.selectCalleesListByRoboticIdsAndUserIds(userIdSet, roboticIdSet);
|
|
|
+ Map<String, CompanyVoiceRoboticCallees> calleesMp = companyVoiceRoboticCallees.stream().collect(Collectors.toMap(e -> e.getUserId() + "-" + e.getRoboticId(), Function.identity(), (existing, replacement) -> existing));
|
|
|
|
|
|
long l = System.currentTimeMillis();
|
|
|
|
|
|
//根据加微成功
|
|
|
for (CompanyWxClient client : addList) {
|
|
|
CompanyVoiceRobotic clientRobotic = roboticsMp.getOrDefault(client.getRoboticId(), null);
|
|
|
- if(null == clientRobotic){
|
|
|
+ if (null == clientRobotic) {
|
|
|
log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
|
|
|
continue;
|
|
|
}
|
|
|
CompanyVoiceRoboticCallees callees = calleesMp.getOrDefault(client.getCustomerId() + "-" + client.getRoboticId(), null);
|
|
|
- if(null == callees){
|
|
|
+ if (null == callees) {
|
|
|
log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有找到任务", client.getRoboticId(), client.getId());
|
|
|
continue;
|
|
|
}
|
|
|
Integer addWxTime = clientRobotic.getAddWxTime();
|
|
|
- if(null == addWxTime){
|
|
|
+ if (null == addWxTime) {
|
|
|
log.error("ROBOTIC-ID:{},CLIENT-ID:{},没有设置加微后置等待时间", client.getRoboticId(), client.getId());
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
long endT = System.currentTimeMillis() + addWxTime * 60 * 1000;
|
|
|
StringBuilder sb = new StringBuilder(Constants.CID_NEXT_TASK_ID).append(callees.getRoboticId()).append(":").append(callees.getId());
|
|
|
redisCache.setCacheObject(sb.toString(), String.valueOf(endT));
|
|
|
@@ -206,13 +210,13 @@ public class WxTaskService {
|
|
|
companyVoiceRoboticCallees.forEach(robotic ->
|
|
|
robotic.setRunTaskFlow(
|
|
|
StringUtils.isBlank(robotic.getRunTaskFlow()) ?
|
|
|
- Constants.ADD_WX: robotic.getRunTaskFlow() + "," + Constants.ADD_WX
|
|
|
+ Constants.ADD_WX : robotic.getRunTaskFlow() + "," + Constants.ADD_WX
|
|
|
)
|
|
|
);
|
|
|
companyVoiceRoboticCalleesServiceImpl.updateBatchById(companyVoiceRoboticCallees);
|
|
|
companyVoiceRoboticServiceImpl.finishAddWxByCallees(roboticIdSet);
|
|
|
}
|
|
|
- if(!addAccountList.isEmpty()){
|
|
|
+ if (!addAccountList.isEmpty()) {
|
|
|
companyWxAccountService.updateBatchById(addAccountList);
|
|
|
}
|
|
|
|
|
|
@@ -224,15 +228,15 @@ public class WxTaskService {
|
|
|
WxConfig config = JSONUtil.toBean(json, WxConfig.class);
|
|
|
List<CompanyWxAccount> list = companyWxAccountService.list();
|
|
|
list.forEach(e -> {
|
|
|
- if(e.getAccountCreateTime() != null){
|
|
|
+ if (e.getAccountCreateTime() != null) {
|
|
|
long until = e.getAccountCreateTime().until(now.toLocalDate(), ChronoUnit.DAYS);
|
|
|
- if(until > config.getNewAccountTime()){
|
|
|
+ if (until > config.getNewAccountTime()) {
|
|
|
e.setIsNew(1);
|
|
|
}
|
|
|
}
|
|
|
- if(e.getIsNew() == 0){
|
|
|
+ if (e.getIsNew() == 0) {
|
|
|
e.setAddNum(config.getNewAccountAddNum());
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
e.setAddNum(RandomUtil.randomInt(config.getAccountAddMax(), config.getAccountAddMin()));
|
|
|
}
|
|
|
e.setIsAddNum(0);
|
|
|
@@ -260,7 +264,7 @@ public class WxTaskService {
|
|
|
List<CompanyVoiceRobotic> successList = list.stream().filter(e -> StringUtils.isNotEmpty(e.getRunTaskFlow()) && e.getTaskFlow().length() == e.getRunTaskFlow().length()).collect(Collectors.toList());
|
|
|
List<CompanyVoiceRobotic> waitList = list.stream().filter(e -> StringUtils.isEmpty(e.getRunTaskFlow()) || e.getTaskFlow().length() != e.getRunTaskFlow().length()).collect(Collectors.toList());
|
|
|
successList.forEach(e -> e.setTaskStatus(3));
|
|
|
- if(!successList.isEmpty()){
|
|
|
+ if (!successList.isEmpty()) {
|
|
|
log.info("已经完成任务:{}", successList.size());
|
|
|
companyVoiceRoboticService.updateBatchById(successList);
|
|
|
}
|
|
|
@@ -270,7 +274,7 @@ public class WxTaskService {
|
|
|
log.info("ROBOTIC-ID:{},当前任务顺序:{}", e.getId(), e.getTaskFlow());
|
|
|
String runTaskFlow = e.getRunTaskFlow();
|
|
|
log.info("ROBOTIC-ID:{},已有任务:{}", e.getId(), e.getRunTaskFlow());
|
|
|
- if(StringUtils.isNotEmpty(runTaskFlow)){
|
|
|
+ if (StringUtils.isNotEmpty(runTaskFlow)) {
|
|
|
Arrays.asList(runTaskFlow.split(",")).forEach(taskFlow::remove);
|
|
|
}
|
|
|
log.info("ROBOTIC-ID:{},当前还剩余任务:{}", e.getId(), taskFlow);
|
|
|
@@ -279,7 +283,7 @@ public class WxTaskService {
|
|
|
});
|
|
|
Function<CompanyVoiceRobotic, String> getKey = e -> Constants.TASK_ID + e.getId();
|
|
|
waitList.forEach(e -> {
|
|
|
- if(redisCache.getCacheObject(getKey.apply(e)) != null){
|
|
|
+ if (redisCache.getCacheObject(getKey.apply(e)) != null) {
|
|
|
log.info("ROBOTIC-ID:{},已有正在执行任务", e.getId());
|
|
|
return;
|
|
|
}
|
|
|
@@ -292,7 +296,7 @@ public class WxTaskService {
|
|
|
break;
|
|
|
case Constants.ADD_WX:
|
|
|
//第一步是调用添加微信步骤
|
|
|
- if(StringUtils.isBlank(e.getRunTaskFlow()) && StringUtils.isNotBlank(e.getTaskFlow()) && e.getTaskFlow().startsWith(Constants.ADD_WX)){
|
|
|
+ if (StringUtils.isBlank(e.getRunTaskFlow()) && StringUtils.isNotBlank(e.getTaskFlow()) && e.getTaskFlow().startsWith(Constants.ADD_WX)) {
|
|
|
companyVoiceRoboticServiceImpl.allocateWx(e);
|
|
|
// CompletableFuture.supplyAsync(()->{
|
|
|
// //分配个微账号
|
|
|
@@ -330,7 +334,7 @@ public class WxTaskService {
|
|
|
// log.error("ROBOTIC-ID:{},任务执行异常:{}", e.getId(), e.getNowTask(), ex);
|
|
|
// return null;
|
|
|
// });
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
//todo 接入原有加微逻辑
|
|
|
}
|
|
|
break;
|
|
|
@@ -338,7 +342,7 @@ public class WxTaskService {
|
|
|
|
|
|
break;
|
|
|
}
|
|
|
- }catch (Exception exception){
|
|
|
+ } catch (Exception exception) {
|
|
|
log.error("ROBOTIC-ID:{},任务执行失败:{}", e.getId(), e.getNowTask(), exception);
|
|
|
redisCache.deleteObject(getKey.apply(e));
|
|
|
}
|
|
|
@@ -403,31 +407,34 @@ public class WxTaskService {
|
|
|
|
|
|
/**
|
|
|
* 单任务加微
|
|
|
+ *
|
|
|
* @param roboticId
|
|
|
* @param callerId
|
|
|
* @return
|
|
|
*/
|
|
|
- private Boolean addWxOne(Long roboticId,Long callerId){
|
|
|
+ private Boolean addWxOne(Long roboticId, Long callerId) {
|
|
|
|
|
|
return Boolean.TRUE;
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* 单个任务执行且为单条执行对象
|
|
|
+ *
|
|
|
* @param roboticId
|
|
|
* @param callerId
|
|
|
*/
|
|
|
- public String cellRunOne(Long roboticId,Long callerId){
|
|
|
+ public String cellRunOne(Long roboticId, Long callerId) {
|
|
|
|
|
|
//查询任务执行情况
|
|
|
CompanyVoiceRoboticCallees data = companyVoiceRoboticCalleesMapper.selectDataByCalleesId(callerId);
|
|
|
CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectCompanyVoiceRoboticById(roboticId);
|
|
|
String taskFlow = data.getTaskFlow();
|
|
|
- if( null == data || null == robotic ){
|
|
|
- log.error("没有查询到任务执行数据,roboticId:{},callerId:{}",roboticId,callerId);
|
|
|
+ if (null == data || null == robotic) {
|
|
|
+ log.error("没有查询到任务执行数据,roboticId:{},callerId:{}", roboticId, callerId);
|
|
|
return null;
|
|
|
}
|
|
|
- if(Integer.valueOf(3).equals(robotic.getTaskStatus())){
|
|
|
- log.error("执行任务已经完成了,roboticId:{}",roboticId);
|
|
|
+ if (Integer.valueOf(3).equals(robotic.getTaskStatus())) {
|
|
|
+ log.error("执行任务已经完成了,roboticId:{}", roboticId);
|
|
|
return null;
|
|
|
}
|
|
|
String nextTask;
|
|
|
@@ -440,14 +447,14 @@ public class WxTaskService {
|
|
|
data.setRunTaskFlow(robotic.getRunTaskFlow());
|
|
|
// return null;
|
|
|
}
|
|
|
- if(StringUtils.isBlank(nextTask)){
|
|
|
- log.error("任务没有下个执行任务,标记完成,roboticId:{}",roboticId);
|
|
|
+ if (StringUtils.isBlank(nextTask)) {
|
|
|
+ log.error("任务没有下个执行任务,标记完成,roboticId:{}", roboticId);
|
|
|
companyVoiceRoboticMapper.finishRobotic(roboticId);
|
|
|
return null;
|
|
|
}
|
|
|
- log.info("单人单任务执行ROBOTIC-ID:{},caller_id:{},当前需要执行任务:{}", roboticId,callerId, nextTask);
|
|
|
+ log.info("单人单任务执行ROBOTIC-ID:{},caller_id:{},当前需要执行任务:{}", roboticId, callerId, nextTask);
|
|
|
String nextTaskOptimized = null;
|
|
|
- try{
|
|
|
+ try {
|
|
|
switch (nextTask) {
|
|
|
case Constants.CELL_PHONE:
|
|
|
companyVoiceRoboticService.callPhoneOne(roboticId, callerId);
|
|
|
@@ -457,13 +464,13 @@ public class WxTaskService {
|
|
|
Boolean success = addWxOne(roboticId, callerId);
|
|
|
break;
|
|
|
case Constants.SEND_MSG:
|
|
|
- if(Integer.valueOf(0).equals(data.getIsSendMsg())){
|
|
|
+ if (Integer.valueOf(0).equals(data.getIsSendMsg())) {
|
|
|
//发送短信前一个任务如果是打电话 等待电话打完以后再执行发送
|
|
|
String lastTaskOptimized = getLastTaskOptimized(taskFlow);
|
|
|
- if(Constants.CELL_PHONE.equals(lastTaskOptimized)){
|
|
|
+ if (Constants.CELL_PHONE.equals(lastTaskOptimized)) {
|
|
|
//是否打电话结束有回调值 完成电话动作以后执行下一步
|
|
|
CompanyVoiceRoboticCallLogCallphone companyVoiceRoboticCallLogCallphone = companyVoiceRoboticCallLogCallphoneService.selectLogByRoboticIdAndCallerId(roboticId, callerId);
|
|
|
- if(null != companyVoiceRoboticCallLogCallphone && companyVoiceRoboticCallLogCallphone.getStatus() == 1){
|
|
|
+ if (null != companyVoiceRoboticCallLogCallphone && companyVoiceRoboticCallLogCallphone.getStatus() == 1) {
|
|
|
nextTaskOptimized = "wait callPhone";
|
|
|
break;
|
|
|
}
|
|
|
@@ -476,20 +483,20 @@ public class WxTaskService {
|
|
|
companyVoiceRoboticService.sendMsgOne(roboticId, callerId);
|
|
|
nextTaskOptimized = getNextTaskOptimized(taskFlow, data.getRunTaskFlow() + "," + Constants.SEND_MSG);
|
|
|
break;
|
|
|
- } else{
|
|
|
- log.info("不再需要发送短信处理,roboticId:{},callerId:{}",roboticId,callerId);
|
|
|
+ } else {
|
|
|
+ log.info("不再需要发送短信处理,roboticId:{},callerId:{}", roboticId, callerId);
|
|
|
nextTaskOptimized = getNextTaskOptimized(taskFlow, data.getRunTaskFlow() + "," + Constants.SEND_MSG);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- } catch (Exception ex){
|
|
|
- log.error("执行任务异常,roboticId:{},callerId:{},nextTask:{}",roboticId,callerId,nextTask,ex);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("执行任务异常,roboticId:{},callerId:{},nextTask:{}", roboticId, callerId, nextTask, ex);
|
|
|
nextTaskOptimized = "exception";
|
|
|
}
|
|
|
|
|
|
- if(StringUtils.isNotBlank(nextTaskOptimized)){
|
|
|
+ if (StringUtils.isNotBlank(nextTaskOptimized)) {
|
|
|
return nextTaskOptimized;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
//任务执行完了 没有下一步 直接完成任务
|
|
|
// companyVoiceRoboticMapper.finishRobotic(roboticId);
|
|
|
return null;
|
|
|
@@ -499,6 +506,7 @@ public class WxTaskService {
|
|
|
|
|
|
/**
|
|
|
* 获取下一个任务
|
|
|
+ *
|
|
|
* @param taskFlow
|
|
|
* @param runTaskFlow
|
|
|
* @return
|
|
|
@@ -523,20 +531,21 @@ public class WxTaskService {
|
|
|
|
|
|
/**
|
|
|
* 获取最后一个执行的任务
|
|
|
+ *
|
|
|
* @param taskFlow
|
|
|
* @return
|
|
|
*/
|
|
|
- public String getLastTaskOptimized(String taskFlow){
|
|
|
+ public String getLastTaskOptimized(String taskFlow) {
|
|
|
return taskFlow.substring(taskFlow.lastIndexOf(",") + 1);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 调用下一个任务
|
|
|
*/
|
|
|
- public void callNextTask(){
|
|
|
+ public void callNextTask() {
|
|
|
//
|
|
|
RLock lock = redissonClient.getLock("CID_CALL_NEXT_TASK");
|
|
|
- try{
|
|
|
+ try {
|
|
|
lock.lock();
|
|
|
log.info("===========CID扫描执行下一个任务任务执行开始===========");
|
|
|
long l = System.currentTimeMillis();
|
|
|
@@ -545,33 +554,33 @@ public class WxTaskService {
|
|
|
String[] keyArr = key.split(":");
|
|
|
String taskId = keyArr[keyArr.length - 2];
|
|
|
String callerId = keyArr[keyArr.length - 1];
|
|
|
- Long runTime =Long.valueOf(redisCache.getCacheObject(key));
|
|
|
- log.info("任务执行时间:{},当前时间:{}",runTime,l);
|
|
|
+ Long runTime = Long.valueOf(redisCache.getCacheObject(key));
|
|
|
+ log.info("任务执行时间:{},当前时间:{}", runTime, l);
|
|
|
//到了该执行时间
|
|
|
- if(runTime.compareTo(l) <= 0){
|
|
|
- log.info("开始执行任务:{},callerId:{}",taskId,callerId);
|
|
|
+ if (runTime.compareTo(l) <= 0) {
|
|
|
+ log.info("开始执行任务:{},callerId:{}", taskId, callerId);
|
|
|
//得到待执行任务
|
|
|
- CompletableFuture.supplyAsync(() -> this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId)),cidExcutor).thenApply(res -> {
|
|
|
- if(StringUtils.isBlank(res)){
|
|
|
+ CompletableFuture.supplyAsync(() -> this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId)), cidExcutor).thenApply(res -> {
|
|
|
+ if (StringUtils.isBlank(res)) {
|
|
|
redisCache.deleteObject(key);
|
|
|
redisCache.deleteObject(Constants.TASK_ID + taskId);
|
|
|
}
|
|
|
return null;
|
|
|
}).exceptionally(throwable -> {
|
|
|
- log.error("单项任务执行或删除失败,taskId: {},callerId:{}", taskId, callerId,throwable);
|
|
|
+ log.error("单项任务执行或删除失败,taskId: {},callerId:{}", taskId, callerId, throwable);
|
|
|
return null;
|
|
|
});
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
// todo 加入新逻辑 没有到执行时间的待执行任务 检查上一个任务的执行状态
|
|
|
// 如果是已经完成的状态 修改待执行时间为现在 下次进入任务会直接执行对应的下个任务
|
|
|
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- }catch (Exception ex){
|
|
|
+ } catch (Exception ex) {
|
|
|
log.error("CID任务自动调用调用下一个任务失败", ex);
|
|
|
} finally {
|
|
|
- if(lock.isHeldByCurrentThread()){
|
|
|
+ if (lock.isHeldByCurrentThread()) {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
}
|
|
|
@@ -582,15 +591,17 @@ public class WxTaskService {
|
|
|
|
|
|
@Autowired
|
|
|
CompanyVoiceRoboticCallLogAddwxServiceImpl companyVoiceRoboticCallLogAddwxService;
|
|
|
+
|
|
|
/**
|
|
|
* 记录任务执行日志 addWx
|
|
|
+ *
|
|
|
* @param logAddwx
|
|
|
*/
|
|
|
- public void asyncSaveCompanyVoiceRoboticCallLog(CompanyVoiceRoboticCallLogAddwx logAddwx){
|
|
|
- try{
|
|
|
+ public void asyncSaveCompanyVoiceRoboticCallLog(CompanyVoiceRoboticCallLogAddwx logAddwx) {
|
|
|
+ try {
|
|
|
companyVoiceRoboticCallLogAddwxService.asyncInsertCompanyVoiceRoboticCallLog(logAddwx);
|
|
|
- }catch (Exception ex){
|
|
|
- log.error("记录任务执行日志失败:失败数据:{}",logAddwx, ex);
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("记录任务执行日志失败:失败数据:{}", logAddwx, ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -694,4 +705,30 @@ public class WxTaskService {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 扫描工作流延时任务
|
|
|
+ */
|
|
|
+ public void cidWorkflowCallRun() {
|
|
|
+ log.info("===========工作流延时任务开始扫描===========");
|
|
|
+ String delayCallKeyPrefix = AiCallTaskNode.getDelayCallKeyPrefix(null) + "*";
|
|
|
+ Set<String> keys = redisKeyScanner.scanMatchKey(delayCallKeyPrefix);
|
|
|
+ log.info("共扫描到 {} 个待处理键", keys.size());
|
|
|
+ HashMap commonMp = new HashMap();
|
|
|
+ commonMp.put("callSource","timer");
|
|
|
+ keys.parallelStream().forEach(key -> {
|
|
|
+ try {
|
|
|
+ //doExec
|
|
|
+ CompletableFuture.runAsync(()->{
|
|
|
+ String cacheObject = redisCache.getCacheObject(key);
|
|
|
+ ExecutionContext context = JSONObject.parseObject(cacheObject, ExecutionContext.class);
|
|
|
+ companyWorkflowEngine.timeDoExecute(context.getWorkflowInstanceId(),context.getCurrentNodeKey(),context.getVariables());
|
|
|
+ }, cidExcutor);
|
|
|
+
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("处理工作流延时任务异常 - key: {}", key, ex);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ log.info("===========工作流延时任务扫描结束===========");
|
|
|
+ }
|
|
|
}
|