Pārlūkot izejas kodu

发送短信加入处理执行中被中断的数据逻辑处理,加入redis锁防止重复消费

yjwang 3 dienas atpakaļ
vecāks
revīzija
be5de6159d

+ 150 - 17
fs-ipad-task/src/main/java/com/fs/app/task/SendSmsMsg.java

@@ -1,14 +1,13 @@
 package com.fs.app.task;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
 import com.fs.common.service.ISmsService;
-import com.fs.common.utils.date.DateUtil;
-import com.fs.course.domain.FsCourseWatchLog;
+import com.fs.company.domain.CompanySmsLogs;
+import com.fs.company.mapper.CompanySmsLogsMapper;
 import com.fs.course.service.IFsCourseWatchLogService;
 import com.fs.his.domain.FsUser;
 import com.fs.his.dto.SendResultDetailDTO;
@@ -16,14 +15,11 @@ import com.fs.his.service.IFsUserService;
 import com.fs.his.utils.PhoneUtil;
 import com.fs.qw.domain.QwIpadServer;
 import com.fs.qw.domain.QwSopSmsLogs;
-import com.fs.qw.domain.QwUser;
 import com.fs.qw.mapper.QwIpadServerMapper;
 import com.fs.qw.service.IQwSopSmsLogsService;
-import com.fs.qw.vo.QwSopCourseFinishTempSetting;
 import com.fs.sop.domain.QwSopLogs;
 import com.fs.sop.mapper.QwSopLogsMapper;
 import com.fs.sop.service.IQwSopLogsService;
-import com.fs.sop.service.impl.QwSopLogsServiceImpl;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.RateLimiter;
@@ -33,7 +29,6 @@ import org.springframework.context.annotation.Lazy;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
-
 import javax.annotation.PreDestroy;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -66,16 +61,18 @@ public class SendSmsMsg {
 
     private final IQwSopLogsService qwSopLogsService;
 
+    private final CompanySmsLogsMapper companySmsLogsMapper;
+
     private final IFsCourseWatchLogService watchLogService;
 
     // 线程池配置
     private static final int CORE_POOL_SIZE = 50;
-    private static final int MAX_POOL_SIZE = 200;
-    private static final int QUEUE_CAPACITY = 1000;
+    private static final int MAX_POOL_SIZE = 150;
+    private static final int QUEUE_CAPACITY = 3000;
     private static final long KEEP_ALIVE_TIME = 60L;
 
     // 分页大小
-    private static final int PAGE_SIZE = 5000;
+    private static final int PAGE_SIZE = 3000;
 
     // 手机号缓存
     private final Cache<Long, String> phoneCache = CacheBuilder.newBuilder()
@@ -84,7 +81,7 @@ public class SendSmsMsg {
             .build();
 
     // 限流器:控制全局发送速率
-    private final RateLimiter rateLimiter = RateLimiter.create(1000); // 每秒1000条
+    private final RateLimiter rateLimiter = RateLimiter.create(500); // 每秒500条
 
     // 短信发送线程池
     private final ThreadPoolExecutor smsExecutor = new ThreadPoolExecutor(
@@ -99,7 +96,7 @@ public class SendSmsMsg {
     private final ExecutorService statusExecutor = Executors.newSingleThreadExecutor();
 
     // 批量失败状态更新阈值
-    private static final int BATCH_UPDATE_SIZE = 500;
+    private static final int BATCH_UPDATE_SIZE = 1000;
 
     public SendSmsMsg(QwIpadServerMapper qwIpadServerMapper,
                       IQwSopSmsLogsService qwSopSmsLogsService,
@@ -107,7 +104,8 @@ public class SendSmsMsg {
                       IFsUserService fsUserService, RedisCache redisCache,
                       QwSopLogsMapper qwSopLogsMapper,
                       IQwSopLogsService qwSopLogsService,
-                      IFsCourseWatchLogService watchLogService) {
+                      IFsCourseWatchLogService watchLogService,
+                      CompanySmsLogsMapper companySmsLogsMapper) {
         this.qwIpadServerMapper = qwIpadServerMapper;
         this.qwSopSmsLogsService = qwSopSmsLogsService;
         this.smsService = smsService;
@@ -116,10 +114,30 @@ public class SendSmsMsg {
         this.qwSopLogsMapper = qwSopLogsMapper;
         this.qwSopLogsService = qwSopLogsService;
         this.watchLogService = watchLogService;
+        this.companySmsLogsMapper = companySmsLogsMapper;
     }
 
     @Scheduled(cron = "0 0 * * * ?") // 每小时执行一次
     public synchronized void sendSms() {
+        // 添加错峰延迟
+        try {
+            if (!StringUtils.isEmpty(groupNo)) {
+                int groupNoInt = Integer.parseInt(groupNo.trim());
+                int delaySeconds = groupNoInt % 10;
+
+                log.info("【错峰执行】窗口 {} 将延迟 {} 秒后执行", groupNo, delaySeconds);
+
+                if (delaySeconds > 0) {
+                    Thread.sleep(delaySeconds * 1000L);
+                }
+            }
+        } catch (InterruptedException e) {
+            log.error("【错峰执行】窗口 {} 等待被中断", groupNo, e);
+            Thread.currentThread().interrupt();
+            return;
+        } catch (NumberFormatException e) {
+            log.warn("【错峰执行】groupNo 格式错误,不执行延迟:{}", groupNo, e);
+        }
         sendSms(null);
     }
 
@@ -258,8 +276,11 @@ public class SendSmsMsg {
             //数据走数据校验
             if(!sopLogsMap.containsKey(logRecord.getSopLogId())){
                 log.error("处理sopLogId {} 失败,sopLogsMap 中不存在执行记录", logRecord.getSopLogId());
+                failReasonsList.add(new SendResultDetailDTO(false, "sopLogsMap 中不存在执行记录", logRecord.getSopLogId()));
+                failedIds.add(logRecord.getId());
                 continue;
             }
+
             QwSopLogs qwSopLogs = sopLogsMap.get(logRecord.getSopLogId());
 
             // 判断消息状态是否满足发送条件
@@ -273,7 +294,13 @@ public class SendSmsMsg {
 
             String redisKey = groupNo + ":" + logRecord.getId();
             try {
-                redisCache.setCacheObject(redisKey, logRecord.getId(), 2, TimeUnit.HOURS);
+                Boolean locked = redisCache.setIfAbsent(redisKey, logRecord.getId(), 20, TimeUnit.MINUTES);
+
+                if (locked == null || !locked) {
+                    log.warn("记录 id={} 已有锁,跳过发送(可能正在发送或重复提交)", logRecord.getId());
+                    continue;
+                }
+
                 SendResultDetailDTO detail = sendSingleSms(logRecord, userPhoneMap, redisKey);
                 if (detail.isSuccess()) {
                     success++;
@@ -283,10 +310,10 @@ public class SendSmsMsg {
                     failedIds.add(logRecord.getId());
                 }
             } catch (Exception e) {
+                redisCache.deleteObject(redisKey); // 异常立即释放锁
                 log.error("发送异常 id={}", logRecord.getId(), e);
                 failReasonsList.add(new SendResultDetailDTO(false, e.getMessage(), logRecord.getSopLogId()));
                 failedIds.add(logRecord.getId());
-                redisCache.deleteObject(redisKey);
             }
 
             //批量阈值
@@ -362,7 +389,9 @@ public class SendSmsMsg {
                     logRecord.getSmsTemplateCode(),
                     logRecord.getSopLogId(),
                     logRecord.getSmsIndex(),
-                    redisKey
+                    redisKey,
+                    logRecord.getCompanyId(),
+                    logRecord.getCompanyUserId()
             );
 
             if (r != null && "200".equals(String.valueOf(r.get("code")))) {
@@ -482,8 +511,112 @@ public class SendSmsMsg {
     }
 
 
-    //处理特定场景,电脑被重启后,发送短信任务会丢失
+    /**
+     * 处理执行中被中断的数据(如服务重启导致的发送中状态残留)
+     * 每10分钟执行一次,加入Redis锁检查,避免重复发送
+     */
+    @Scheduled(cron = "0 0/10 * * * ?")
+    public synchronized void  processSendLogsTask() {
+        this.processSendLogs(null);
+    }
+
+    public void  processSendLogsTest(String num) {
+        this.processSendLogs(num);
+    }
+
+    public void processSendLogs(String num) {
+        if(!StringUtils.isEmpty(num)){
+            groupNo = num;
+        }
+        log.info("处理执行中被中断的数据---->: 开始执行,groupNo={}", groupNo);
+
+        if (StringUtils.isEmpty(groupNo)) {
+            log.warn("processSendLogs: groupNo 为空,跳过执行");
+            return;
+        }
+
+        long groupOn;
+        try {
+            groupOn = Long.parseLong(groupNo.trim());
+        } catch (NumberFormatException e) {
+            log.warn("processSendLogs: groupNo 无法转为数字, groupNo={}", groupNo);
+            return;
+        }
+
+        // 获取server_ids
+        List<Long> serverIds = getServerIds(groupOn);
+        if (serverIds.isEmpty()) {
+            log.info("processSendLogs: 分组 groupNo={} 无 server,跳过", groupNo);
+            return;
+        }
+
+        List<QwSopSmsLogs> batch = qwSopSmsLogsService.getQwSopSmsLogsStateList(serverIds);
+        if (batch.isEmpty()) {
+            log.info("processSendLogs: 无待处理记录,groupNo={}", groupNo);
+            return;
+        }
+
+        List<Long> uuIds = batch.stream()
+                .map(QwSopSmsLogs::getSopLogId)
+                .collect(Collectors.toList());
+        List<CompanySmsLogs> companySmsLogsList = companySmsLogsMapper.getCompanySmsLogsByUuIdList(uuIds);
+        Map<String, CompanySmsLogs> checkMap = companySmsLogsList.stream()
+                .collect(Collectors.toMap(
+                        c -> c.getSopSmsLogId() + "-" + c.getSmsIndex(),
+                        v -> v,
+                        (v1, v2) -> v1
+                ));
+
+        List<QwSopSmsLogs> needCheckList = batch.stream()
+                .filter(e -> !checkMap.containsKey(e.getSopLogId() + "-" + e.getSmsIndex()))
+                .collect(Collectors.toList());
+
+        if (needCheckList.isEmpty()) {
+            log.info("processSendLogs: 所有记录在企业日志中已存在,无需处理,groupNo={}", groupNo);
+            return;
+        }
 
+        List<Long> sopLogIds = needCheckList.stream()
+                .map(QwSopSmsLogs::getSopLogId)
+                .collect(Collectors.toList());
+        List<QwSopLogs> sopLogs = qwSopLogsMapper.getQwSopInfoByUid(sopLogIds);
+        Map<Long, QwSopLogs> sopLogsMap = sopLogs.stream()
+                .collect(Collectors.toMap(QwSopLogs::getSmsLogsId, s -> s, (v1, v2) -> v1));
+
+        AtomicLong totalProcessed = new AtomicLong(0);
+        AtomicLong totalFailed = new AtomicLong(0);
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        // 按server_id分组,提交异步任务
+        Map<Long, List<QwSopSmsLogs>> groupByServer = needCheckList.stream()
+                .collect(Collectors.groupingBy(QwSopSmsLogs::getServerId));
+
+        for (Map.Entry<Long, List<QwSopSmsLogs>> entry : groupByServer.entrySet()) {
+            List<QwSopSmsLogs> serverBatch = entry.getValue();
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                try {
+                    SendResult result = processServerBatch(serverBatch, sopLogsMap);
+                    totalProcessed.addAndGet(result.success);
+                    totalFailed.addAndGet(result.failed);
+                } catch (Exception e) {
+                    log.error("处理server {} 批次失败", entry.getKey(), e);
+                }
+            }, smsExecutor);
+            futures.add(future);
+        }
+
+        if (!futures.isEmpty()) {
+            try {
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+                        .get(1, TimeUnit.HOURS);
+            } catch (Exception e) {
+                log.error("processSendLogs: 等待任务完成时发生异常", e);
+            }
+        }
+
+        log.info("processSendLogs: 处理完成,groupNo={}, 成功={}, 失败={}",
+                groupNo, totalProcessed.get(), totalFailed.get());
+    }
 
 
     public SendResultDetailDTO isSendLogs(QwSopLogs qwSopLogs,QwSopSmsLogs logRecord) {

+ 1 - 1
fs-service/src/main/java/com/fs/common/service/ISmsService.java

@@ -24,6 +24,6 @@ public interface ISmsService
 
     R sendCaptcha(String phone, String captcha, String code);
 
-    R sendUrl(String phone, String content, String code,Long uuid,Integer smsIndex,String deleteKey);
+    R sendUrl(String phone, String content, String code,Long uuid,Integer smsIndex,String deleteKey,Long companyId,Long companyUserId);
 
 }

+ 3 - 1
fs-service/src/main/java/com/fs/common/service/impl/SmsServiceImpl.java

@@ -669,7 +669,7 @@ public class SmsServiceImpl implements ISmsService
 
 
     @Override
-    public R sendUrl(String phone, String content, String code,Long uuid,Integer smsIndex,String deleteKey) {
+    public R sendUrl(String phone, String content, String code,Long uuid,Integer smsIndex,String deleteKey,Long companyId,Long companyUserId) {
         log.info("发送短信:{},链接地址:{},短信模板:{}", phone, content, code);
         CompanySmsTemp temp = smsTempService.selectCompanySmsTempByCode(code);
         if (temp == null) {
@@ -702,6 +702,8 @@ public class SmsServiceImpl implements ISmsService
                         logs.setStatus(0);
                         logs.setType(sms.getType());
                         logs.setMid(itemVO.getMid());
+                        logs.setCompanyId(companyId);
+                        logs.setCompanyUserId(companyUserId);
                         if (uuid != null) {
                             logs.setSopSmsLogId(uuid);
                         }

+ 6 - 0
fs-service/src/main/java/com/fs/company/mapper/CompanySmsLogsMapper.java

@@ -194,4 +194,10 @@ public interface CompanySmsLogsMapper
     List<CompanySmsLogsStatisticsVO> selectSmsLogsStatisticsList(@Param("maps") CompanyStatisticsParam param);
     @Select("select * from company_sms_logs where phone=#{mobile} order by logs_id desc limit 1")
     CompanySmsLogs selectCompanySmsLogsByMobile(String mobile);
+
+    /**
+     * 获取相关短信记录
+     * @param uuIds
+     * **/
+    List<CompanySmsLogs> getCompanySmsLogsByUuIdList(@Param("uuIds") List<Long> uuIds);
 }

+ 8 - 0
fs-service/src/main/java/com/fs/qw/mapper/QwSopSmsLogsMapper.java

@@ -128,4 +128,12 @@ public interface QwSopSmsLogsMapper extends BaseMapper<QwSopSmsLogs>{
 
     @DataSource(DataSourceType.SOP)
     void batchInsertQwSopSmsLogsOneTouch(@Param("qwSopSmsLogs") List<QwSopSmsLogs> logsToInsert);
+
+    /**
+     * 获取状态数据
+     * @param serverIds 服务器ID
+     * @return lsit 服务数据
+     * **/
+    @DataSource(DataSourceType.SOP)
+    List<QwSopSmsLogs> getQwSopSmsLogsStateList(@Param("serverIds") List<Long> serverIds);
 }

+ 7 - 0
fs-service/src/main/java/com/fs/qw/service/IQwSopSmsLogsService.java

@@ -105,4 +105,11 @@ public interface IQwSopSmsLogsService extends IService<QwSopSmsLogs>{
 
 
     void batchInsertQwSopSmsLogsOneTouch(List<QwSopSmsLogs> logsToInsert);
+
+    /**
+     * 获取状态数据
+     * @param serverIds 服务器ID
+     * @return lsit 服务数据
+     * **/
+    List<QwSopSmsLogs> getQwSopSmsLogsStateList(List<Long> serverIds);
 }

+ 5 - 0
fs-service/src/main/java/com/fs/qw/service/impl/QwSopSmsLogsServiceImpl.java

@@ -140,4 +140,9 @@ public class QwSopSmsLogsServiceImpl extends ServiceImpl<QwSopSmsLogsMapper, QwS
     public void batchInsertQwSopSmsLogsOneTouch(List<QwSopSmsLogs> logsToInsert) {
          baseMapper.batchInsertQwSopSmsLogsOneTouch(logsToInsert);
     }
+
+    @Override
+    public List<QwSopSmsLogs> getQwSopSmsLogsStateList(List<Long> serverIds) {
+        return baseMapper.getQwSopSmsLogsStateList(serverIds);
+    }
 }

+ 13 - 0
fs-service/src/main/resources/mapper/company/CompanySmsLogsMapper.xml

@@ -129,4 +129,17 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         ) t
         GROUP BY t.type
     </select>
+
+    <select id="getCompanySmsLogsByUuIdList" resultType="com.fs.company.domain.CompanySmsLogs">
+        SELECT
+        logs_id,
+        sop_sms_log_id,
+        sms_index
+        FROM
+        company_sms_logs
+        WHERE sop_sms_log_id IN
+        <foreach collection="uuIds" item="item" index="index" open="(" separator="," close=")">
+            #{item}
+        </foreach>
+    </select>
 </mapper>