Browse Source

更新pad 发送优化代码

吴树波 1 day ago
parent
commit
fb63baf0b8

+ 60 - 48
fs-ipad-task/src/main/java/com/fs/app/task/SendMsg.java

@@ -8,26 +8,32 @@ import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.PubFun;
 import com.fs.course.config.CourseConfig;
 import com.fs.course.config.CourseMaConfig;
+import com.fs.ipad.vo.BaseVo;
 import com.fs.qw.domain.QwIpadServer;
 import com.fs.qw.domain.QwUser;
 import com.fs.qw.mapper.QwIpadServerMapper;
 import com.fs.qw.mapper.QwUserMapper;
 import com.fs.qw.service.impl.AsyncSopTestService;
 import com.fs.qw.vo.QwSopCourseFinishTempSetting;
+import com.fs.qw.vo.QwSopTempSetting;
 import com.fs.sop.domain.QwSopLogs;
 import com.fs.sop.mapper.QwSopLogsMapper;
 import com.fs.sop.service.IQwSopLogsService;
 import com.fs.system.domain.SysConfig;
 import com.fs.system.mapper.SysConfigMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -43,28 +49,31 @@ public class SendMsg {
     private final SysConfigMapper sysConfigMapper;
     private final IQwSopLogsService qwSopLogsService;
     private final AsyncSopTestService asyncSopTestService;
-    private final RedisCacheT<Long> redisCache;
     private final QwIpadServerMapper qwIpadServerMapper;
+    private final RedisCacheT<Long> redisCache;
 
     @Value("${group-no}")
     private String groupNo;
     private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
     private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
+    private final Map<Long, Long> removeQwMap = new ConcurrentHashMap<>();
 
+    @Autowired
+    @Qualifier("customThreadPool")
+    private ThreadPoolTaskExecutor customThreadPool;
 
-    public SendMsg(QwUserMapper qwUserMapper, QwSopLogsMapper qwSopLogsMapper, IpadSendServer sendServer, SysConfigMapper sysConfigMapper, IQwSopLogsService qwSopLogsService, AsyncSopTestService asyncSopTestService, RedisCacheT<Long> redisCache, QwIpadServerMapper qwIpadServerMapper) {
+    public SendMsg(QwUserMapper qwUserMapper, QwSopLogsMapper qwSopLogsMapper, IpadSendServer sendServer, SysConfigMapper sysConfigMapper, IQwSopLogsService qwSopLogsService, AsyncSopTestService asyncSopTestService, QwIpadServerMapper qwIpadServerMapper, RedisCacheT<Long> redisCache) {
         this.qwUserMapper = qwUserMapper;
         this.qwSopLogsMapper = qwSopLogsMapper;
         this.sendServer = sendServer;
         this.sysConfigMapper = sysConfigMapper;
         this.qwSopLogsService = qwSopLogsService;
         this.asyncSopTestService = asyncSopTestService;
-        this.redisCache = redisCache;
         this.qwIpadServerMapper = qwIpadServerMapper;
+        this.redisCache = redisCache;
     }
     private List<QwUser> getQwUserList() {
         if (qwUserList.isEmpty()) {
-//            List<QwUser> qwUsers = qwUserMapper.selectList(new QueryWrapper<QwUser>().eq("send_msg_type", 1).eq("server_status", 1).eq("ipad_status", 1).isNotNull("server_id").eq("corp_id", corpId));
             List<QwIpadServer> serverList = qwIpadServerMapper.selectList(new QueryWrapper<QwIpadServer>().eq("group_no", groupNo));
             if(serverList.isEmpty()){
                 return new ArrayList<>();
@@ -83,12 +92,11 @@ public class SendMsg {
     }
 
 
-    @Scheduled(fixedRate = 50000) // 每10秒执行一次
+    @Scheduled(fixedRate = 50000) // 每50秒执行一次
     public void refulsQwUserList() {
         qwUserList.clear();
     }
-
-    @Scheduled(fixedRate = 20000) // 每10秒执行一次
+    @Scheduled(fixedDelay = 20000) // 每20秒执行一次
     public void sendMsg2() {
         log.info("执行日志:{}", LocalDateTime.now());
         if (StringUtils.isEmpty(groupNo)) {
@@ -109,45 +117,50 @@ public class SendMsg {
             delayEnd = config.getDelayEnd();
         }
         Map<String, CourseMaConfig> miniMap = getMiniMap();
-
-        List<QwUser> qwUsers = getQwUserList().stream().parallel().filter(e -> !qwMap.containsKey(e.getId())).collect(Collectors.toList());
-        qwUsers.parallelStream().forEach(e -> {
-            if (qwMap.putIfAbsent(e.getId(), System.currentTimeMillis()) == null) {
-                // 判断企微是否发送消息,是否为ipad发送并且判断是否真实在线
-                new Thread(() -> {
-                    try {
-                        processUser(e, delayStart, delayEnd, miniMap);
-                    } catch (Exception exception){
-                        log.error("发送错误:", exception);
-                    }
-                    log.info("销售删除线程:{}", e.getQwUserName());
-                    qwMap.remove(e.getId());
-                }).start();
+        // 清空需要删除的
+        List<Long> keyList = new ArrayList<>(removeQwMap.keySet());
+        keyList.forEach(key -> {
+            removeQwMap.remove(key);
+            qwMap.remove(key);
+        });
+        log.info("删除id:{}", JSON.toJSONString(keyList));
+        getQwUserList().forEach(e -> {
+            synchronized (e.getId()){
+                if (qwMap.containsKey(e.getId())) {
+                    log.error("用户:{}已在处理中,跳过重复执行", e.getQwUserName());
+                    return;
+                }
+                qwMap.computeIfAbsent(e.getId(), k -> {
+                    CompletableFuture.runAsync(() -> {
+                        try {
+                            processUser(e, delayStart, delayEnd, miniMap);
+                        } catch (Exception exception){
+                            log.error("发送错误:", exception);
+                        }finally {
+                            removeQwMap.putIfAbsent(e.getId(), System.currentTimeMillis());
+                        }
+                    }, customThreadPool);
+                    return System.currentTimeMillis(); // 占位值
+                });
             }
         });
     }
 
     private void processUser(QwUser qwUser, int delayStart, int delayEnd, Map<String, CourseMaConfig> miniMap) {
-        if (!qwMap.containsKey(qwUser.getId())) {
-            log.warn("用户:{}已在处理中,跳过重复执行", qwUser.getQwUserName());
-            return;
-        }
-        qwMap.put(qwUser.getId(), System.currentTimeMillis());
         long start1 = System.currentTimeMillis();
         List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
         if(qwSopLogList.isEmpty()){
             return;
         }
         QwUser user = qwUserMapper.selectById(qwUser.getId());
+        BaseVo parentVo = new BaseVo();
         long end1 = System.currentTimeMillis();
         if (!sendServer.isSend(user)) {
-            qwMap.remove(user.getId());
             return;
         }
         log.info("销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
         long start3 = System.currentTimeMillis();
         for (QwSopLogs qwSopLogs : qwSopLogList) {
-
             long start2 = System.currentTimeMillis();
             QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
             // 判断消息状态是否满足发送条件
@@ -155,16 +168,24 @@ public class SendMsg {
                 log.info("销售:{}, 消息发送条件未满足:{}", user.getQwUserName(), qwSopLogs.getId());
                 continue;
             }
+            log.info("进入发送消息状态:{}", qwSopLogs.getId());
+            String key = "qw:logs:pad:send:id:" + qwSopLogs.getId();
+            Long time = redisCache.getCacheObject(key);
+            if(redisCache.getCacheObject(key) != null){
+                log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId(), time);
+                return;
+            }
+            redisCache.setCacheObject(key, System.currentTimeMillis(), 10, TimeUnit.MINUTES);
             for (QwSopCourseFinishTempSetting.Setting content : setting.getSetting()) {
                 sendServer.send(content, user, qwSopLogs, miniMap);
-                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
-                    QwUser update = new QwUser();
-                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
-                    update.setUpdateTime(new Date());
-                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
-                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
-                    return;
-                }
+//                if(content.getSendStatus() == 2 && "请求失败:消息发送过于频繁,请稍后再试".equals(content.getSendRemarks())){
+//                    QwUser update = new QwUser();
+//                    update.setRemark("请求频率异常,暂停发送,三小时后恢复继续发送");
+//                    update.setUpdateTime(new Date());
+//                    qwUserMapper.update(update, new QueryWrapper<QwUser>().eq("id", user.getId()));
+//                    redisCache.setCacheObject("qw:user:id:" + user.getId(), user.getId(), 3, TimeUnit.HOURS);
+//                    return;
+//                }
                 try {
                     int delay = ThreadLocalRandom.current().nextInt(300, 1000);
                     log.debug("等待:{}ms", delay);
@@ -173,16 +194,6 @@ public class SendMsg {
                     log.error("线程等待错误!");
                 }
             }
-//            if (!setting.getSetting().isEmpty()) {
-//                new Thread(() -> {
-//                    try {
-//                        List<QwSopTempSetting.Content.Setting> settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class).stream().filter(e -> "9".equals(e.getContentType())).collect(Collectors.toList());
-//                        asyncSopTestService.asyncSendMsgBySopAppLinkNormalIM(settings, qwSopLogs.getCorpId(), user.getCompanyUserId(), qwSopLogs.getFsUserId());
-//                    } catch (Exception e) {
-//                        log.error("推送APP失败", e);
-//                    }
-//                }).start();
-//            }
             qwSopLogs.setSend(true);
             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
             QwSopLogs updateQwSop = new QwSopLogs();
@@ -203,8 +214,9 @@ public class SendMsg {
                 updateQwSop.setRemark("全部发送成功");
                 updateQwSop.setRealSendTime(sdf.format(new Date()));
             }
-            updateQwSop.setContentJson(JSON.toJSONString(setting));
-
+//            updateQwSop.setReceivingStatus(1L);
+//            updateQwSop.setSendStatus(1L);
+//            updateQwSop.setRealSendTime(sdf.format(new Date()));
             long end2 = System.currentTimeMillis();
             int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
             log.info("销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(),end2 - start2);

+ 4 - 0
fs-qw-task/src/main/java/com/fs/app/taskService/impl/SopLogsTaskServiceImpl.java

@@ -811,9 +811,13 @@ public class SopLogsTaskServiceImpl implements SopLogsTaskService {
         sopLogs.setReceivingStatus(0L);
 
         String[] userKey = logVo.getUserId().split("\\|");
+        log.info("sopLogVo:{}", JSON.toJSONString(logVo));
+        log.info("sop_logs -》 userId:{}", logVo.getUserId());
+        log.info("sop_logs -》 userId -》 split:{}", Arrays.asList(userKey));
         sopLogs.setCompanyId(Long.valueOf(userKey[2].trim()));
         if (StringUtils.isNotEmpty(userKey[0].trim())){
             sopLogs.setQwUserKey(Long.valueOf(userKey[0].trim()));
+            log.info("qw_sop数据:{}", JSON.toJSONString(sopLogs));
         }
         sopLogs.setSopId(logVo.getSopId());
         sopLogs.setSort(Integer.valueOf(logVo.getStartTime().replaceAll("-","")));