|
@@ -1,140 +0,0 @@
|
|
|
-package com.fs.app.task;
|
|
|
-
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
-import com.fs.common.utils.PubFun;
|
|
|
-import com.fs.common.utils.date.DateUtil;
|
|
|
-import com.fs.course.config.CourseConfig;
|
|
|
-import com.fs.qw.domain.QwUser;
|
|
|
-import com.fs.qw.mapper.QwUserMapper;
|
|
|
-import com.fs.qw.vo.QwSopLogMqVo;
|
|
|
-import com.fs.sop.domain.QwSopLogs;
|
|
|
-import com.fs.sop.mapper.QwSopLogsMapper;
|
|
|
-import com.fs.sop.service.IQwSopLogsService;
|
|
|
-import com.fs.system.domain.SysConfig;
|
|
|
-import com.fs.system.mapper.SysConfigMapper;
|
|
|
-import lombok.AllArgsConstructor;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
|
-import org.springframework.retry.annotation.Backoff;
|
|
|
-import org.springframework.retry.annotation.Retryable;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.time.LocalDateTime;
|
|
|
-import java.time.temporal.ChronoUnit;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Comparator;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
-@Component
|
|
|
-@Slf4j
|
|
|
-@AllArgsConstructor
|
|
|
-public class QwMsgTask {
|
|
|
-
|
|
|
- private final QwUserMapper qwUserMapper;
|
|
|
- private final QwSopLogsMapper qwSopLogsMapper;
|
|
|
- private final IQwSopLogsService qwSopLogsService;
|
|
|
- private final SysConfigMapper sysConfigMapper;
|
|
|
- private final RocketMQTemplate rocketMQTemplate;
|
|
|
-
|
|
|
-
|
|
|
-// @Scheduled(fixedDelay = 1000)
|
|
|
- public void sendMsg() throws InterruptedException {
|
|
|
- long startTimeFun = System.currentTimeMillis();
|
|
|
- log.info("============= 开始处理ipad发送逻辑 =============");
|
|
|
- LocalDateTime now = LocalDateTime.now();
|
|
|
- LocalDateTime dateTime = LocalDateTime.now().plusMinutes(15);
|
|
|
-
|
|
|
- SysConfig courseConfig = sysConfigMapper.selectConfigByConfigKey("course.config");
|
|
|
- CourseConfig config = JSON.parseObject(courseConfig.getConfigValue(), CourseConfig.class);
|
|
|
- // 消息发送延迟
|
|
|
- int delay;
|
|
|
- if(config.getDelayStart() == null){
|
|
|
- log.info("消息发送延迟为空手动设置1000ms");
|
|
|
- delay = 1000;
|
|
|
- }else {
|
|
|
- delay = config.getDelayStart();
|
|
|
- }
|
|
|
- log.info("消息发送延迟:{}ms", delay);
|
|
|
- List<QwUser> qwUsers = qwUserMapper.selectList(new QueryWrapper<QwUser>().eq("send_msg_type", 1).eq("server_status", 1).eq("ipad_status", 1).isNotNull("server_id"));
|
|
|
- Map<String, QwUser> qwUserMap = PubFun.listToMapByGroupObject(qwUsers, e -> e.getQwUserId() + e.getCorpId());
|
|
|
- log.info("获取到需要发送ipad企微数量:{}", qwUsers.size());
|
|
|
- List<List<QwUser>> groupList = new ArrayList<>();
|
|
|
- PubFun.batchProcessing(10, qwUsers, groupList::add);
|
|
|
-
|
|
|
- CountDownLatch sopGroupLatch = new CountDownLatch(groupList.size());
|
|
|
- int sum = groupList.parallelStream().mapToInt(e -> processSopGroupFun(e, qwUserMap, delay, now, dateTime, sopGroupLatch)).sum();
|
|
|
- // 等待所有 SOP 分组处理完成
|
|
|
- sopGroupLatch.await();
|
|
|
- long endTime = System.currentTimeMillis();
|
|
|
- log.info("============= 处理结束ipad发送逻辑:{}ms 报错记录:{}条=============", endTime - startTimeFun, sum);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Async("sopIpadTaskExecutor")
|
|
|
- @Retryable(
|
|
|
- value = {Exception.class},
|
|
|
- maxAttempts = 3,
|
|
|
- backoff = @Backoff(delay = 2000)
|
|
|
- )
|
|
|
- public int processSopGroupFun(List<QwUser> qwUserList, Map<String, QwUser> qwUserMap, int delay, LocalDateTime now, LocalDateTime dateTime, CountDownLatch latch) {
|
|
|
- try {
|
|
|
- return processSopGroupAsync(qwUserList, qwUserMap, delay, dateTime, now);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("处理IPAD代发送错误", e);
|
|
|
- } finally {
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
- return 0;
|
|
|
- }
|
|
|
- public int processSopGroupAsync(List<QwUser> qwUserList, Map<String, QwUser> qwUserMap, int delay, LocalDateTime now, LocalDateTime dateTime) {
|
|
|
-// log.info("开始分批处理企微:{}", qwUserList.size());
|
|
|
- List<Long> qwUserIds = PubFun.listToNewList(qwUserList, QwUser::getId);
|
|
|
- List<QwSopLogs> qwSopLogs = qwSopLogsMapper.selectByQwUserIdIn(qwUserIds, DateUtil.formatLocalDateTime(dateTime));
|
|
|
-// log.info("获取到企微:{}代发送记录条数:{}", PubFun.listToNewList(qwUserList, QwUser::getQwUserName), qwSopLogs.size());
|
|
|
- Map<String, List<QwSopLogs>> qwSopLogMap = PubFun.listToMapByGroupList(qwSopLogs, QwSopLogs::getQwUserid);
|
|
|
- int num = 0;
|
|
|
- qwSopLogMap.forEach((k, v) -> {
|
|
|
- AtomicLong i = new AtomicLong(1);
|
|
|
- AtomicLong lastTime = new AtomicLong(0);
|
|
|
- v.stream().filter(e -> qwUserMap.containsKey(e.getQwUserid() + e.getCorpId())).sorted(Comparator.comparing(e -> DateUtil.toInstant(DateUtil.stringToLocalDateTime(e.getSendTime())) + e.getSort())).forEach(e -> {
|
|
|
- long darkTime = delay * i.getAndIncrement();
|
|
|
- LocalDateTime startTime = DateUtil.stringToLocalDateTime(e.getSendTime());
|
|
|
- long between = ChronoUnit.MILLIS.between(now, startTime);
|
|
|
- if(between < 0) between = 0;
|
|
|
-// if(lastTime.get() + darkTime + delay < between){
|
|
|
-// darkTime = between;
|
|
|
-// }else{
|
|
|
- darkTime += between;
|
|
|
-// }
|
|
|
- e.setSend(true);
|
|
|
-
|
|
|
- rocketMQTemplate.syncSendDelayTimeMills("ipad-send-new", JSON.toJSONString(QwSopLogMqVo.builder().id(e.getId()).build()), darkTime);
|
|
|
- lastTime.set(darkTime);
|
|
|
- e.setMs(darkTime);
|
|
|
- });
|
|
|
- });
|
|
|
- List<String> saveBath = qwSopLogs.stream().filter(QwSopLogs::isSend).map(QwSopLogs::getId).collect(Collectors.toList());
|
|
|
- if(!saveBath.isEmpty()){
|
|
|
- log.info("企微:{}保存发送记录条数:{}", PubFun.listToNewList(qwUserList, QwUser::getQwUserName), saveBath.size());
|
|
|
-// qwSopLogsMapper.batchUpdateQwSopLogsIpadSendStatus(saveBath);
|
|
|
-
|
|
|
- qwSopLogsService.updateBatch(qwSopLogs.stream().map(e -> {
|
|
|
- QwSopLogs l = new QwSopLogs();
|
|
|
- l.setId(e.getId());
|
|
|
- l.setSendStatus(6L);
|
|
|
- l.setMs(e.getMs());
|
|
|
- return l;
|
|
|
- }).collect(Collectors.toList()));
|
|
|
- return qwSopLogs.size();
|
|
|
- }
|
|
|
- return 0;
|
|
|
- }
|
|
|
-}
|