|  | @@ -15,7 +15,6 @@ import com.fs.qw.mapper.QwIpadServerMapper;
 | 
	
		
			
				|  |  |  import com.fs.qw.mapper.QwUserMapper;
 | 
	
		
			
				|  |  |  import com.fs.qw.service.impl.AsyncSopTestService;
 | 
	
		
			
				|  |  |  import com.fs.qw.vo.QwSopCourseFinishTempSetting;
 | 
	
		
			
				|  |  | -import com.fs.qw.vo.QwSopTempSetting;
 | 
	
		
			
				|  |  |  import com.fs.sop.domain.QwSopLogs;
 | 
	
		
			
				|  |  |  import com.fs.sop.mapper.QwSopLogsMapper;
 | 
	
		
			
				|  |  |  import com.fs.sop.service.IQwSopLogsService;
 | 
	
	
		
			
				|  | @@ -37,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ConcurrentHashMap;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ThreadLocalRandom;
 | 
	
		
			
				|  |  |  import java.util.concurrent.TimeUnit;
 | 
	
		
			
				|  |  | -import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  @Component
 | 
	
		
			
				|  |  |  @Slf4j
 | 
	
	
		
			
				|  | @@ -56,7 +54,6 @@ public class SendMsg {
 | 
	
		
			
				|  |  |      private String groupNo;
 | 
	
		
			
				|  |  |      private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
 | 
	
		
			
				|  |  |      private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
 | 
	
		
			
				|  |  | -    private final Map<Long, Long> removeQwMap = new ConcurrentHashMap<>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      @Qualifier("customThreadPool")
 | 
	
	
		
			
				|  | @@ -75,7 +72,7 @@ public class SendMsg {
 | 
	
		
			
				|  |  |      private List<QwUser> getQwUserList() {
 | 
	
		
			
				|  |  |          if (qwUserList.isEmpty()) {
 | 
	
		
			
				|  |  |              List<QwIpadServer> serverList = qwIpadServerMapper.selectList(new QueryWrapper<QwIpadServer>().eq("group_no", groupNo));
 | 
	
		
			
				|  |  | -            if(serverList.isEmpty()){
 | 
	
		
			
				|  |  | +            if (serverList.isEmpty()) {
 | 
	
		
			
				|  |  |                  return new ArrayList<>();
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              List<Long> serverIds = PubFun.listToNewList(serverList, QwIpadServer::getId);
 | 
	
	
		
			
				|  | @@ -117,45 +114,36 @@ public class SendMsg {
 | 
	
		
			
				|  |  |              delayEnd = config.getDelayEnd();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          Map<String, CourseMaConfig> miniMap = getMiniMap();
 | 
	
		
			
				|  |  | -        // 清空需要删除的
 | 
	
		
			
				|  |  | -        List<Long> keyList = new ArrayList<>(removeQwMap.keySet());
 | 
	
		
			
				|  |  | -        keyList.forEach(key -> {
 | 
	
		
			
				|  |  | -            removeQwMap.remove(key);
 | 
	
		
			
				|  |  | -            qwMap.remove(key);
 | 
	
		
			
				|  |  | -        });
 | 
	
		
			
				|  |  | -        log.info("删除id:{}", JSON.toJSONString(keyList));
 | 
	
		
			
				|  |  |          getQwUserList().forEach(e -> {
 | 
	
		
			
				|  |  | -            synchronized (e.getId()){
 | 
	
		
			
				|  |  | -                if (qwMap.containsKey(e.getId())) {
 | 
	
		
			
				|  |  | -                    log.error("用户:{}已在处理中,跳过重复执行", e.getQwUserName());
 | 
	
		
			
				|  |  | -                    return;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -                qwMap.computeIfAbsent(e.getId(), k -> {
 | 
	
		
			
				|  |  | -                    CompletableFuture.runAsync(() -> {
 | 
	
		
			
				|  |  | -                        try {
 | 
	
		
			
				|  |  | -                            processUser(e, delayStart, delayEnd, miniMap);
 | 
	
		
			
				|  |  | -                        } catch (Exception exception){
 | 
	
		
			
				|  |  | -                            log.error("发送错误:", exception);
 | 
	
		
			
				|  |  | -                        }finally {
 | 
	
		
			
				|  |  | -                            removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | -                    }, customThreadPool);
 | 
	
		
			
				|  |  | -                    return System.currentTimeMillis(); // 占位值
 | 
	
		
			
				|  |  | -                });
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +            qwMap.computeIfAbsent(e.getId(), k -> {
 | 
	
		
			
				|  |  | +                CompletableFuture.runAsync(() -> {
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        log.info("开始任务:{}", e.getQwUserName());
 | 
	
		
			
				|  |  | +                        processUser(e, delayStart, delayEnd, miniMap);
 | 
	
		
			
				|  |  | +                    } catch (Exception exception) {
 | 
	
		
			
				|  |  | +                        log.error("发送错误:", exception);
 | 
	
		
			
				|  |  | +                    } finally {
 | 
	
		
			
				|  |  | +                        log.info("删除任务:{}", e.getQwUserName());
 | 
	
		
			
				|  |  | +                        qwMap.remove(e.getId());
 | 
	
		
			
				|  |  | +//                        removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }, customThreadPool);
 | 
	
		
			
				|  |  | +                return System.currentTimeMillis(); // 占位值
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, CourseMaConfig> miniMap) {
 | 
	
		
			
				|  |  |          long start1 = System.currentTimeMillis();
 | 
	
		
			
				|  |  |          List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
 | 
	
		
			
				|  |  | -        if(qwSopLogList.isEmpty()){
 | 
	
		
			
				|  |  | +        if (qwSopLogList.isEmpty()) {
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          QwUser user = qwUserMapper.selectById(qwUser.getId());
 | 
	
		
			
				|  |  |          BaseVo parentVo = new BaseVo();
 | 
	
		
			
				|  |  | +        parentVo.setCorpCode(qwUser.getCorpId());
 | 
	
		
			
				|  |  |          long end1 = System.currentTimeMillis();
 | 
	
		
			
				|  |  | -        if (!sendServer.isSend(user)) {
 | 
	
		
			
				|  |  | +        if (!sendServer.isSend(user, parentVo)) {
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
 | 
	
	
		
			
				|  | @@ -171,24 +159,27 @@ public class SendMsg {
 | 
	
		
			
				|  |  |              log.info("进入发送消息状态:{}", qwSopLogs.getId());
 | 
	
		
			
				|  |  |              String key = "qw:logs:pad:send:id:" + qwSopLogs.getId();
 | 
	
		
			
				|  |  |              Long time = redisCache.getCacheObject(key);
 | 
	
		
			
				|  |  | -            if(redisCache.getCacheObject(key) != null){
 | 
	
		
			
				|  |  | +            if (redisCache.getCacheObject(key) != null) {
 | 
	
		
			
				|  |  |                  log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId(), time);
 | 
	
		
			
				|  |  |                  return;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              redisCache.setCacheObject(key, System.currentTimeMillis(), 10, TimeUnit.MINUTES);
 | 
	
		
			
				|  |  |              for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
 | 
	
		
			
				|  |  | -                sendServer.send(content, user, qwSopLogs, miniMap);
 | 
	
		
			
				|  |  | -//                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
 | 
	
		
			
				|  |  | -//                    QwUser update = new QwUser();
 | 
	
		
			
				|  |  | -//                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
 | 
	
		
			
				|  |  | -//                    update.setUpdateTime(new Date());
 | 
	
		
			
				|  |  | -//                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
 | 
	
		
			
				|  |  | -//                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
 | 
	
		
			
				|  |  | -//                    return;
 | 
	
		
			
				|  |  | -//                }
 | 
	
		
			
				|  |  | +                long start4 = System.currentTimeMillis();
 | 
	
		
			
				|  |  | +                sendServer.send(content, user, qwSopLogs, miniMap, parentVo);
 | 
	
		
			
				|  |  | +                long end4 = System.currentTimeMillis();
 | 
	
		
			
				|  |  | +                log.info("请求pad发送完成:{}, {}, 时长4:{}", user.getQwUserName(), qwSopLogs.getId(), end4 - start4);
 | 
	
		
			
				|  |  | +                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
 | 
	
		
			
				|  |  | +                    QwUser update = new QwUser();
 | 
	
		
			
				|  |  | +                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
 | 
	
		
			
				|  |  | +                    update.setUpdateTime(new Date());
 | 
	
		
			
				|  |  | +                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
 | 
	
		
			
				|  |  | +                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |                  try {
 | 
	
		
			
				|  |  |                      int delay = ThreadLocalRandom.current().nextInt(300, 1000);
 | 
	
		
			
				|  |  | -                    log.debug("等待:{}ms", delay);
 | 
	
		
			
				|  |  | +                    log.debug("pad发送消息等待:{}ms", delay);
 | 
	
		
			
				|  |  |                      Thread.sleep(delay);
 | 
	
		
			
				|  |  |                  } catch (InterruptedException e) {
 | 
	
		
			
				|  |  |                      log.error("线程等待错误!");
 | 
	
	
		
			
				|  | @@ -214,15 +205,13 @@ public class SendMsg {
 | 
	
		
			
				|  |  |                  updateQwSop.setRemark("全部发送成功");
 | 
	
		
			
				|  |  |                  updateQwSop.setRealSendTime(sdf.format(new Date()));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -//            updateQwSop.setReceivingStatus(1L);
 | 
	
		
			
				|  |  | -//            updateQwSop.setSendStatus(1L);
 | 
	
		
			
				|  |  | -//            updateQwSop.setRealSendTime(sdf.format(new Date()));
 | 
	
		
			
				|  |  | +            updateQwSop.setContentJson(JSON.toJSONString(setting));
 | 
	
		
			
				|  |  |              long end2 = System.currentTimeMillis();
 | 
	
		
			
				|  |  |              int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
 | 
	
		
			
				|  |  | -            log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(),end2 - start2);
 | 
	
		
			
				|  |  | +            log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
 | 
	
		
			
				|  |  |              try {
 | 
	
		
			
				|  |  |                  int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
 | 
	
		
			
				|  |  | -                log.debug("等待:{}ms", delay);
 | 
	
		
			
				|  |  | +                log.debug("企微发送消息等待:{}ms", delay);
 | 
	
		
			
				|  |  |                  Thread.sleep(delay);
 | 
	
		
			
				|  |  |              } catch (InterruptedException e) {
 | 
	
		
			
				|  |  |                  log.error("线程等待错误!");
 |