| 
					
				 | 
			
			
				@@ -1,140 +0,0 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-package com.fs.app.task; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.alibaba.fastjson.JSON; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.common.utils.PubFun; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.common.utils.date.DateUtil; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.course.config.CourseConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.qw.domain.QwUser; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.qw.mapper.QwUserMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.qw.vo.QwSopLogMqVo; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.sop.domain.QwSopLogs; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.sop.mapper.QwSopLogsMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.sop.service.IQwSopLogsService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.system.domain.SysConfig; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import com.fs.system.mapper.SysConfigMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import lombok.AllArgsConstructor; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.apache.rocketmq.spring.core.RocketMQTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.retry.annotation.Backoff; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.retry.annotation.Retryable; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.scheduling.annotation.Async; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.scheduling.annotation.Scheduled; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.time.LocalDateTime; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.time.temporal.ChronoUnit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.ArrayList; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.Comparator; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.concurrent.CountDownLatch; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.concurrent.atomic.AtomicLong; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import java.util.stream.Collectors; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-@Component 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-@Slf4j 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-@AllArgsConstructor 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-public class QwMsgTask { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final QwUserMapper qwUserMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final QwSopLogsMapper qwSopLogsMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final IQwSopLogsService qwSopLogsService; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final SysConfigMapper sysConfigMapper; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private final RocketMQTemplate rocketMQTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//    @Scheduled(fixedDelay = 1000) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public void sendMsg() throws InterruptedException { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        long startTimeFun = System.currentTimeMillis(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("============= 开始处理ipad发送逻辑 ============="); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        LocalDateTime now = LocalDateTime.now(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        LocalDateTime dateTime = LocalDateTime.now().plusMinutes(15); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        SysConfig courseConfig = sysConfigMapper.selectConfigByConfigKey("course.config"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        CourseConfig config = JSON.parseObject(courseConfig.getConfigValue(), CourseConfig.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // 消息发送延迟 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        int delay; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if(config.getDelayStart() == null){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log.info("消息发送延迟为空手动设置1000ms"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            delay = 1000; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            delay = config.getDelayStart(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("消息发送延迟:{}ms", delay); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<QwUser> qwUsers = qwUserMapper.selectList(new QueryWrapper<QwUser>().eq("send_msg_type", 1).eq("server_status", 1).eq("ipad_status", 1).isNotNull("server_id")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<String, QwUser> qwUserMap = PubFun.listToMapByGroupObject(qwUsers, e -> e.getQwUserId() + e.getCorpId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("获取到需要发送ipad企微数量:{}", qwUsers.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<List<QwUser>> groupList = new ArrayList<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        PubFun.batchProcessing(10, qwUsers, groupList::add); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        CountDownLatch sopGroupLatch = new CountDownLatch(groupList.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        int sum = groupList.parallelStream().mapToInt(e -> processSopGroupFun(e, qwUserMap, delay, now, dateTime, sopGroupLatch)).sum(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // 等待所有 SOP 分组处理完成 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        sopGroupLatch.await(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        long endTime = System.currentTimeMillis(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log.info("============= 处理结束ipad发送逻辑:{}ms   报错记录:{}条=============", endTime - startTimeFun, sum); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    @Async("sopIpadTaskExecutor") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    @Retryable( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            value = {Exception.class}, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            maxAttempts = 3, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            backoff = @Backoff(delay = 2000) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public int processSopGroupFun(List<QwUser> qwUserList, Map<String, QwUser> qwUserMap, int delay, LocalDateTime now, LocalDateTime dateTime, CountDownLatch latch) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return processSopGroupAsync(qwUserList, qwUserMap, delay, dateTime, now); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log.error("处理IPAD代发送错误", e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } finally { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            latch.countDown(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public int processSopGroupAsync(List<QwUser> qwUserList, Map<String, QwUser> qwUserMap, int delay, LocalDateTime now, LocalDateTime dateTime) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//        log.info("开始分批处理企微:{}", qwUserList.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<Long> qwUserIds = PubFun.listToNewList(qwUserList, QwUser::getId); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<QwSopLogs> qwSopLogs = qwSopLogsMapper.selectByQwUserIdIn(qwUserIds, DateUtil.formatLocalDateTime(dateTime)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//        log.info("获取到企微:{}代发送记录条数:{}", PubFun.listToNewList(qwUserList, QwUser::getQwUserName), qwSopLogs.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<String, List<QwSopLogs>> qwSopLogMap = PubFun.listToMapByGroupList(qwSopLogs, QwSopLogs::getQwUserid); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        int num = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        qwSopLogMap.forEach((k, v) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            AtomicLong i = new AtomicLong(1); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            AtomicLong lastTime = new AtomicLong(0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            v.stream().filter(e -> qwUserMap.containsKey(e.getQwUserid() + e.getCorpId())).sorted(Comparator.comparing(e -> DateUtil.toInstant(DateUtil.stringToLocalDateTime(e.getSendTime())) + e.getSort())).forEach(e -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                long darkTime = delay * i.getAndIncrement(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                LocalDateTime startTime = DateUtil.stringToLocalDateTime(e.getSendTime()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                long between = ChronoUnit.MILLIS.between(now, startTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                if(between < 0) between = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                if(lastTime.get() + darkTime + delay < between){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                    darkTime = between; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                }else{ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    darkTime += between; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                e.setSend(true); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                rocketMQTemplate.syncSendDelayTimeMills("ipad-send-new", JSON.toJSONString(QwSopLogMqVo.builder().id(e.getId()).build()), darkTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                lastTime.set(darkTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                e.setMs(darkTime); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<String> saveBath = qwSopLogs.stream().filter(QwSopLogs::isSend).map(QwSopLogs::getId).collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if(!saveBath.isEmpty()){ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log.info("企微:{}保存发送记录条数:{}", PubFun.listToNewList(qwUserList, QwUser::getQwUserName), saveBath.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-//                qwSopLogsMapper.batchUpdateQwSopLogsIpadSendStatus(saveBath); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            qwSopLogsService.updateBatch(qwSopLogs.stream().map(e -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                QwSopLogs l = new QwSopLogs(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                l.setId(e.getId()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                l.setSendStatus(6L); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                l.setMs(e.getMs()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                return l; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }).collect(Collectors.toList())); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return qwSopLogs.size(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-} 
			 |