|
|
@@ -0,0 +1,269 @@
|
|
|
+package com.fs.app.task;
|
|
|
+
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.fs.app.service.WxIpadSendServer;
|
|
|
+import com.fs.common.core.redis.RedisCacheT;
|
|
|
+import com.fs.common.utils.PubFun;
|
|
|
+import com.fs.company.domain.CompanyWxAccount;
|
|
|
+import com.fs.company.mapper.CompanyWxAccountMapper;
|
|
|
+import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
|
+import com.fs.wx.sop.domain.WxSopLogs;
|
|
|
+import com.fs.wx.sop.mapper.WxSopLogsMapper;
|
|
|
+import com.fs.wx.sop.mapper.WxSopMapper;
|
|
|
+import com.fs.wxcid.domain.CidIpadServer;
|
|
|
+import com.fs.wxcid.mapper.CidIpadServerMapper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class SendMsg {
|
|
|
+
|
|
|
+ private final CompanyWxAccountMapper companyWxAccountMapper;
|
|
|
+ private final CidIpadServerMapper ipadServerMapper;
|
|
|
+ private final WxSopLogsMapper wxSopLogsMapper;
|
|
|
+ private final WxSopMapper wxSopMapper;
|
|
|
+ private final WxIpadSendServer sendServer;
|
|
|
+ private final RedisCacheT<Long> redisCache;
|
|
|
+ @Value("${group-no}")
|
|
|
+ private String groupNo;
|
|
|
+ private final List<CompanyWxAccount> qwUserList = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ private final Map<Long, TaskContext> qwMap = new ConcurrentHashMap<>();
|
|
|
+ private static final long TASK_TIMEOUT_MS = 3 * 60 * 1000L;
|
|
|
+ @Autowired
|
|
|
+ private Executor sopChatTaskExecutor;
|
|
|
+
|
|
|
+ public SendMsg(CompanyWxAccountMapper companyWxAccountMapper, CidIpadServerMapper ipadServerMapper, WxSopLogsMapper wxSopLogsMapper, WxSopMapper wxSopMapper, WxIpadSendServer sendServer, RedisCacheT<Long> redisCache) {
|
|
|
+ this.companyWxAccountMapper = companyWxAccountMapper;
|
|
|
+ this.ipadServerMapper = ipadServerMapper;
|
|
|
+ this.wxSopLogsMapper = wxSopLogsMapper;
|
|
|
+ this.wxSopMapper = wxSopMapper;
|
|
|
+ this.sendServer = sendServer;
|
|
|
+ this.redisCache = redisCache;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class TaskContext {
|
|
|
+ final long startTime;
|
|
|
+ final AtomicBoolean cancelled;
|
|
|
+
|
|
|
+ TaskContext() {
|
|
|
+ this.startTime = System.currentTimeMillis();
|
|
|
+ this.cancelled = new AtomicBoolean(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isTimeout() {
|
|
|
+ return System.currentTimeMillis() - startTime > TASK_TIMEOUT_MS;
|
|
|
+ }
|
|
|
+
|
|
|
+ void cancel() {
|
|
|
+ cancelled.set(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isCancelled() {
|
|
|
+ return cancelled.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Qualifier("customThreadPool")
|
|
|
+ private ThreadPoolTaskExecutor customThreadPool;
|
|
|
+
|
|
|
+ private List<CompanyWxAccount> getUserList() {
|
|
|
+ if (qwUserList.isEmpty()) {
|
|
|
+ List<CidIpadServer> serverList = ipadServerMapper.selectList(new QueryWrapper<CidIpadServer>().eq("group_no", groupNo));
|
|
|
+ if (serverList.isEmpty()) {
|
|
|
+ log.info("没找到可用的服务器 {} ", serverList);
|
|
|
+ return new ArrayList<>();
|
|
|
+ }
|
|
|
+ List<Long> serverIds = PubFun.listToNewList(serverList, CidIpadServer::getId);
|
|
|
+ List<CompanyWxAccount> qwUsers = companyWxAccountMapper.selectList(new QueryWrapper<CompanyWxAccount>().eq("send_msg_type", 1).eq("server_status", 1).eq("ipad_status", 1).in("server_id", serverIds));
|
|
|
+ qwUserList.addAll(qwUsers);
|
|
|
+ }
|
|
|
+ log.info("getQwUserList {}", JSON.toJSONString(qwUserList));
|
|
|
+ return qwUserList;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedRate = 50000) // 每50秒执行一次
|
|
|
+ public void refulsQwUserList() {
|
|
|
+ qwUserList.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedDelay = 20000) // 每20秒执行一次
|
|
|
+ public void sendMsg2() {
|
|
|
+ log.info("执行日志:{}", LocalDateTime.now());
|
|
|
+ if (StringUtils.isEmpty(groupNo)) {
|
|
|
+ log.error("corpId为空不执行");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 看课配置文件获取
|
|
|
+ // 消息发送延迟
|
|
|
+ int delayStart = 1000;
|
|
|
+ int delayEnd = 2000;
|
|
|
+ // 获取 pad 发送的企微
|
|
|
+ getUserList().forEach(e -> {
|
|
|
+ TaskContext ctx = qwMap.get(e.getId());
|
|
|
+ if (ctx != null) {
|
|
|
+ if (ctx.isTimeout()) {
|
|
|
+ log.warn("任务超时,标记取消:{}, 已运行: {}ms", e.getWxNickName(), System.currentTimeMillis() - ctx.startTime);
|
|
|
+ ctx.cancel();
|
|
|
+ } else {
|
|
|
+ log.debug("任务正在执行中,跳过:{}", e.getWxNickName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (customThreadPool.getActiveCount() >= customThreadPool.getMaxPoolSize()) {
|
|
|
+ log.warn("线程池已满,跳过任务:{}, 活跃线程: {}/{}", e.getWxNickName(), customThreadPool.getActiveCount(), customThreadPool.getMaxPoolSize());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TaskContext newCtx = new TaskContext();
|
|
|
+ qwMap.put(e.getId(), newCtx);
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ log.info("开始任务:{}", e.getWxNickName());
|
|
|
+ processUser(e, delayStart, delayEnd, newCtx);
|
|
|
+ } catch (Exception exception) {
|
|
|
+ log.error("发送错误:", exception);
|
|
|
+ } finally {
|
|
|
+ log.info("删除任务:{}", e.getWxNickName());
|
|
|
+ qwMap.remove(e.getId());
|
|
|
+ }
|
|
|
+ }, customThreadPool).exceptionally(ex -> {
|
|
|
+ log.error("任务提交失败:{}, 错误: {}", e.getWxNickName(), ex.getMessage());
|
|
|
+ qwMap.remove(e.getId());
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送任务执行
|
|
|
+ *
|
|
|
+ * @param qwUser 发送企微
|
|
|
+ * @param delayStart 随机延迟 最小值
|
|
|
+ * @param delayEnd 随机延迟 最大值
|
|
|
+ * @param miniMap 小程序配置
|
|
|
+ * @param ctx 任务上下文(用于取消检查)
|
|
|
+ */
|
|
|
+ private void processUser(CompanyWxAccount account, int delayStart, int delayEnd, TaskContext ctx) {
|
|
|
+ long start1 = System.currentTimeMillis();
|
|
|
+ // 获取当前企微待发送记录
|
|
|
+ List<WxSopLogs> qwSopLogList = wxSopLogsMapper.selectByWxId(account.getId());
|
|
|
+ if (qwSopLogList.isEmpty()) {
|
|
|
+ log.info("获取当前企微待发送记录为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 获取企微用户
|
|
|
+ long end1 = System.currentTimeMillis();
|
|
|
+ // 检查是否被取消
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,退出:{}", account.getWxNickName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 判断这个企微是否需要发送
|
|
|
+ if (!sendServer.isSend(account)) {
|
|
|
+ log.info("当前这个微信不需要发送 数据{}", account);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", account.getWxNickName(), qwSopLogList.size(), end1 - start1, ctx.startTime);
|
|
|
+ long start3 = System.currentTimeMillis();
|
|
|
+ // 循环代发送消息
|
|
|
+ for (WxSopLogs qwSopLogs : qwSopLogList) {
|
|
|
+ // 检查是否被取消
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,中断发送:{}, 已发送部分消息", account.getWxNickName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ long start2 = System.currentTimeMillis();
|
|
|
+ QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
|
|
|
+ log.info("进入发送消息状态:{}", qwSopLogs.getId());
|
|
|
+ String key = "qw:logs:pad:send:id:" + qwSopLogs.getId();
|
|
|
+ Long time = redisCache.getCacheObject(key);
|
|
|
+ // 判断这个消息有没有进入过发送,如果进了就不要再发了,防止重复发送,,,,, TODO 千万不能动!!!!!
|
|
|
+ if (redisCache.getCacheObject(key) != null) {
|
|
|
+ log.error("{}已有发送:{}, :{}", account.getWxNickName(), qwSopLogs.getId(), time);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ redisCache.setCacheObject(key, System.currentTimeMillis(), 24, TimeUnit.HOURS);
|
|
|
+ // 循环发送消息里面的每一条消息
|
|
|
+ for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
|
|
|
+ // 检查是否被取消
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,中断发送:{}", account.getWxNickName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ long start4 = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 发送消息
|
|
|
+ sendServer.send(content, account, qwSopLogs);
|
|
|
+
|
|
|
+ long end4 = System.currentTimeMillis();
|
|
|
+ log.info("请求pad发送完成:{}, {}, 时长4:{}", account.getWxNickName(), qwSopLogs.getId(), end4 - start4);
|
|
|
+ try {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int delay = ThreadLocalRandom.current().nextInt(300, 1000);
|
|
|
+ log.debug("pad发送消息等待:{}ms", delay);
|
|
|
+ Thread.sleep(delay);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程等待错误!");
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qwSopLogs.setSend(true);
|
|
|
+ WxSopLogs updateQwSop = new WxSopLogs();
|
|
|
+ updateQwSop.setId(qwSopLogs.getId());
|
|
|
+ // 是否全部发送失败
|
|
|
+ if (setting.getSetting().stream().allMatch(e -> e.getSendStatus() == 2)) {
|
|
|
+ updateQwSop.setReceivingStatus(0);
|
|
|
+ updateQwSop.setSendStatus(0);
|
|
|
+ updateQwSop.setRemark("全部发送失败");
|
|
|
+ } else if (setting.getSetting().stream().anyMatch(e -> e.getSendStatus() == 2)) {
|
|
|
+ updateQwSop.setReceivingStatus(1);
|
|
|
+ updateQwSop.setSendStatus(1);
|
|
|
+ updateQwSop.setRemark("部分发送失败");
|
|
|
+ } else {
|
|
|
+ updateQwSop.setReceivingStatus(1);
|
|
|
+ updateQwSop.setSendStatus(1);
|
|
|
+ updateQwSop.setRemark("全部发送成功");
|
|
|
+ }
|
|
|
+ updateQwSop.setContentJson(JSON.toJSONString(setting));
|
|
|
+
|
|
|
+ long end2 = System.currentTimeMillis();
|
|
|
+ int i = wxSopLogsMapper.updateById(updateQwSop);
|
|
|
+ log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", account.getWxNickName(), i, qwSopLogs.getId(), end2 - start2);
|
|
|
+ try {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
|
|
|
+ log.debug("企微发送消息等待:{}ms", delay);
|
|
|
+ Thread.sleep(delay);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程等待错误!");
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ long end3 = System.currentTimeMillis();
|
|
|
+ log.info("销售执行完成:{}, 耗时:{}", account.getWxNickName(), end3 - start3);
|
|
|
+ }
|
|
|
+}
|