|
|
@@ -1,5 +1,6 @@
|
|
|
package com.fs.app.service;
|
|
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.core.util.RandomUtil;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
@@ -9,11 +10,11 @@ import com.fs.common.constant.FsConstants;
|
|
|
import com.fs.common.core.redis.RedisCacheT;
|
|
|
import com.fs.common.utils.PubFun;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
-import com.fs.company.domain.CompanyVoiceRobotic;
|
|
|
-import com.fs.company.domain.CompanyWxAccount;
|
|
|
-import com.fs.company.domain.CompanyWxClient;
|
|
|
-import com.fs.company.domain.CompanyWxDialog;
|
|
|
+import com.fs.company.domain.*;
|
|
|
+import com.fs.company.mapper.CompanyVoiceRoboticCalleesMapper;
|
|
|
import com.fs.company.mapper.CompanyVoiceRoboticMapper;
|
|
|
+import com.fs.company.mapper.CompanyVoiceRoboticWxMapper;
|
|
|
+import com.fs.company.mapper.CompanyWxClientMapper;
|
|
|
import com.fs.company.service.ICompanyVoiceRoboticService;
|
|
|
import com.fs.company.service.ICompanyWxAccountService;
|
|
|
import com.fs.company.service.ICompanyWxClientService;
|
|
|
@@ -31,13 +32,12 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.redisson.api.RLock;
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.RunnableFuture;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.*;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@@ -56,9 +56,21 @@ public class WxTaskService {
|
|
|
private final FriendService friendService;
|
|
|
private final CompanyVoiceRoboticMapper companyVoiceRoboticMapper;
|
|
|
private final RedisCacheT<String> redisCache;
|
|
|
-
|
|
|
+ private final CompanyVoiceRoboticCalleesMapper companyVoiceRoboticCalleesMapper;
|
|
|
+ private final CompanyVoiceRoboticWxMapper companyVoiceRoboticWxMapper;
|
|
|
+ private final CompanyWxClientMapper companyWxClientMapper;
|
|
|
private RedissonClient redissonClient;
|
|
|
|
|
|
+ private final ExecutorService cidExcutor = new ThreadPoolExecutor(
|
|
|
+ 32,
|
|
|
+ 64,
|
|
|
+ 60L,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(1000),
|
|
|
+ r -> new Thread(r, "callPool-" + System.currentTimeMillis()),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
+ );
|
|
|
+
|
|
|
public void addWx(List<Long> accountIdList) {
|
|
|
log.info("==========执行加微信任务开始==========");
|
|
|
String json = sysConfigService.selectConfigByKey("wx.config");
|
|
|
@@ -198,7 +210,47 @@ public class WxTaskService {
|
|
|
log.info("ROBOTIC-ID:{},打电话任务创建完成", e.getId());
|
|
|
break;
|
|
|
case Constants.ADD_WX:
|
|
|
-
|
|
|
+ //第一步是调用添加微信步骤
|
|
|
+ if(StringUtils.isBlank(e.getRunTaskFlow()) && StringUtils.isNotBlank(e.getTaskFlow()) && e.getTaskFlow().startsWith(Constants.ADD_WX)){
|
|
|
+ CompletableFuture.supplyAsync(()->{
|
|
|
+ //分配个微账号
|
|
|
+ return allocateWx(e);
|
|
|
+ },cidExcutor).thenApply(result->{
|
|
|
+ //逐条添加微信,且判定是否任务
|
|
|
+ for (CompanyWxClient client : result) {
|
|
|
+ ArrayList<Long> addWxParamList = new ArrayList<>();
|
|
|
+ addWxParamList.add(client.getAccountId());
|
|
|
+ //添加微信 todo 暂时注释掉 不在添加微信 发布时需要开放
|
|
|
+// addWx(addWxParamList);
|
|
|
+ //判定任务是否有加微后等待时间设定,加入到待执行任务redis
|
|
|
+ if(null != e.getAddWxTime() && e.getAddWxTime() > 0){
|
|
|
+ long endT = System.currentTimeMillis() + e.getAddWxTime() * 60 * 1000;
|
|
|
+ //通过任务+用户id找到calles记录
|
|
|
+ CompanyVoiceRoboticCallees callees = companyVoiceRoboticCalleesMapper.getCalleesByUserIdAndTaskId(client.getCustomerId(), e.getId());
|
|
|
+ if(null != callees && !ObjectUtil.isEmpty(callees)){
|
|
|
+ Long calleesId = callees.getId();
|
|
|
+ StringBuilder sb = new StringBuilder(Constants.CID_NEXT_TASK_ID).append(e.getId()).append(":").append(calleesId);
|
|
|
+ redisCache.setCacheObject(sb.toString(), String.valueOf(endT),e.getAddWxTime() + 5, TimeUnit.MINUTES);
|
|
|
+ if(StringUtils.isNotBlank(callees.getRunTaskFlow())){
|
|
|
+ callees.setRunTaskFlow(callees.getRunTaskFlow() + "," + Constants.ADD_WX);
|
|
|
+ }else{
|
|
|
+ callees.setRunTaskFlow(Constants.ADD_WX);
|
|
|
+ }
|
|
|
+ companyVoiceRoboticCalleesMapper.updateById(callees);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ e.setRunTaskFlow(Constants.ADD_WX);
|
|
|
+ //更新任务 已跑任务值
|
|
|
+ companyVoiceRoboticMapper.updateById(e);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }).exceptionally(ex -> {
|
|
|
+ log.error("ROBOTIC-ID:{},任务执行异常:{}", e.getId(), e.getNowTask(), ex);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ }else{
|
|
|
+ //todo 接入原有加微逻辑
|
|
|
+ }
|
|
|
break;
|
|
|
case Constants.SEND_MSG:
|
|
|
|
|
|
@@ -213,12 +265,116 @@ public class WxTaskService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 单个任务执行
|
|
|
+ * 分配账号
|
|
|
+ * @param robotic
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ public List<CompanyWxClient> allocateWx(CompanyVoiceRobotic robotic){
|
|
|
+ List<CompanyWxClient> resArr = new ArrayList<>();
|
|
|
+ //找到任务指定的微信用户
|
|
|
+ List<CompanyVoiceRoboticWx> companyVoiceRoboticWxes = companyVoiceRoboticWxMapper.selectByRoboticIdWithGroupBy(robotic.getId());
|
|
|
+ Integer totalSize = 0;
|
|
|
+ if(null != companyVoiceRoboticWxes && !companyVoiceRoboticWxes.isEmpty()){
|
|
|
+ totalSize = companyVoiceRoboticWxes.size();
|
|
|
+ } else{
|
|
|
+ log.error("分配对象空,数据异常");
|
|
|
+ throw new RuntimeException("没有找到任务指定的微信用户");
|
|
|
+ }
|
|
|
+ List<CompanyWxClient> companyWxClients = companyWxClientMapper.selectListByRoboticId(robotic.getId());
|
|
|
+ if(null == companyWxClients || companyWxClients.isEmpty()){
|
|
|
+ log.error("分配个微空,数据异常");
|
|
|
+ throw new RuntimeException("没有找到需要分配微信用户");
|
|
|
+ }
|
|
|
+ Integer allocateIndex = 0;
|
|
|
+ //分配客户
|
|
|
+ for (CompanyWxClient companyWxClient : companyWxClients) {
|
|
|
+ CompanyVoiceRoboticWx wx = companyVoiceRoboticWxes.get(allocateIndex++ % totalSize);
|
|
|
+ companyWxClient.setRoboticWxId(wx.getId());
|
|
|
+ companyWxClient.setAccountId(wx.getAccountId());
|
|
|
+ companyWxClient.setDialogId(wx.getWxDialogId());
|
|
|
+ resArr.add(companyWxClient);
|
|
|
+ }
|
|
|
+ //保存数据库
|
|
|
+ companyWxClientService.updateBatchById(companyWxClients);
|
|
|
+ return resArr;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单个任务执行且为单条执行对象
|
|
|
* @param roboticId
|
|
|
* @param callerId
|
|
|
*/
|
|
|
- public void cellRunOne(Long roboticId,Long callerId){
|
|
|
- //todo 跑单个任务的下一个任务
|
|
|
+ public String cellRunOne(Long roboticId,Long callerId){
|
|
|
+
|
|
|
+ //查询任务执行情况
|
|
|
+ CompanyVoiceRoboticCallees data = companyVoiceRoboticCalleesMapper.selectDataByCalleesId(callerId);
|
|
|
+ CompanyVoiceRobotic robotic = companyVoiceRoboticMapper.selectCompanyVoiceRoboticById(roboticId);
|
|
|
+ if( null == data || null == robotic ){
|
|
|
+ log.error("没有查询到任务执行数据,roboticId:{},callerId:{}",roboticId,callerId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if(Integer.valueOf(3).equals(robotic.getTaskStatus())){
|
|
|
+ log.error("执行任务已经完成了,roboticId:{}",roboticId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String nextTask;
|
|
|
+ if (StringUtils.isNotBlank(data.getRunTaskFlow()) && StringUtils.isNotBlank(data.getTaskFlow())) {
|
|
|
+ nextTask = getNextTaskOptimized(data.getTaskFlow(), data.getRunTaskFlow());
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if(StringUtils.isBlank(nextTask)){
|
|
|
+ log.error("任务没有下个执行任务,标记完成,roboticId:{}",roboticId);
|
|
|
+ companyVoiceRoboticMapper.finishRobotic(roboticId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ log.info("单人单任务执行ROBOTIC-ID:{},caller_id:{},当前需要执行任务:{}", roboticId,callerId, nextTask);
|
|
|
+ String nextTaskOptimized = null;
|
|
|
+ switch (nextTask) {
|
|
|
+ case Constants.CELL_PHONE:
|
|
|
+ companyVoiceRoboticService.callPhoneOne(roboticId, callerId);
|
|
|
+ nextTaskOptimized = getNextTaskOptimized(data.getTaskFlow(), data.getRunTaskFlow() + "," + Constants.CELL_PHONE);
|
|
|
+ break;
|
|
|
+// case Constants.ADD_WX:
|
|
|
+// companyVoiceRoboticService.addWxOne();
|
|
|
+// break;
|
|
|
+ case Constants.SEND_MSG:
|
|
|
+ companyVoiceRoboticService.sendMsgOne(roboticId, callerId);
|
|
|
+ nextTaskOptimized = getNextTaskOptimized(data.getTaskFlow(), data.getRunTaskFlow() + "," + Constants.SEND_MSG);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if(StringUtils.isNotBlank(nextTaskOptimized)){
|
|
|
+ return nextTaskOptimized;
|
|
|
+ }else{
|
|
|
+ //任务执行完了 没有下一步 直接完成任务
|
|
|
+ companyVoiceRoboticMapper.finishRobotic(roboticId);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取下一个任务
|
|
|
+ * @param taskFlow
|
|
|
+ * @param runTaskFlow
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public String getNextTaskOptimized(String taskFlow, String runTaskFlow) {
|
|
|
+ if (StringUtils.isBlank(taskFlow)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ String[] allTasks = taskFlow.split(",");
|
|
|
+ Set<String> executedTasks = new HashSet<>();
|
|
|
+
|
|
|
+ if (StringUtils.isNotBlank(runTaskFlow)) {
|
|
|
+ executedTasks.addAll(Arrays.asList(runTaskFlow.split(",")));
|
|
|
+ }
|
|
|
+ for (String task : allTasks) {
|
|
|
+ if (!executedTasks.contains(task.trim())) {
|
|
|
+ return task.trim();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -229,6 +385,7 @@ public class WxTaskService {
|
|
|
RLock lock = redissonClient.getLock("CID_CALL_NEXT_TASK");
|
|
|
try{
|
|
|
lock.lock();
|
|
|
+ log.info("===========CID扫描执行下一个任务任务执行开始===========");
|
|
|
long l = System.currentTimeMillis();
|
|
|
Collection<String> keys = redisCache.keys(Constants.CID_NEXT_TASK_ID + "*");
|
|
|
keys.forEach(key -> {
|
|
|
@@ -236,15 +393,19 @@ public class WxTaskService {
|
|
|
String taskId = keyArr[keyArr.length - 2];
|
|
|
String callerId = keyArr[keyArr.length - 1];
|
|
|
Long runTime =Long.valueOf(redisCache.getCacheObject(key));
|
|
|
+ log.info("任务执行时间:{},当前时间:{}",runTime,l);
|
|
|
//到了该执行时间
|
|
|
if(runTime.compareTo(l) <= 0){
|
|
|
+ log.info("开始执行任务:{},callerId:{}",taskId,callerId);
|
|
|
//得到待执行任务
|
|
|
- CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
- this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId));
|
|
|
- }).thenRun(() -> {
|
|
|
- redisCache.deleteObject(key);
|
|
|
+ CompletableFuture.supplyAsync(() -> this.cellRunOne(Long.valueOf(taskId), Long.valueOf(callerId)),cidExcutor).thenApply(res -> {
|
|
|
+ if(StringUtils.isBlank(res)){
|
|
|
+ redisCache.deleteObject(key);
|
|
|
+ redisCache.deleteObject(Constants.TASK_ID + taskId);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}).exceptionally(throwable -> {
|
|
|
- log.error("单项任务执行或删除失败,taskId: {}", taskId, throwable);
|
|
|
+ log.error("单项任务执行或删除失败,taskId: {},callerId:{}", taskId, callerId,throwable);
|
|
|
return null;
|
|
|
});
|
|
|
}
|
|
|
@@ -257,6 +418,5 @@ public class WxTaskService {
|
|
|
lock.unlock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
}
|