Selaa lähdekoodia

app一键发送定时优化

wangxy 1 viikko sitten
vanhempi
commit
7bbcd877c8

+ 34 - 3
fs-admin/src/main/java/com/fs/his/task/Task.java

@@ -1654,13 +1654,44 @@ public class Task {
             return;
         }
         for (Map.Entry<String, BatchSendCourseAllDTO> entry : toSendMap) {
+            String key=entry.getKey();
             //执行发送消息任务
             BatchSendCourseAllDTO batchSendCourseAllDTO = entry.getValue();
-            openIMService.batchSendCourseTask(batchSendCourseAllDTO.getBatchSendCourseDTO(), batchSendCourseAllDTO.getOpenImBatchMsgDTO(), batchSendCourseAllDTO.getProject(), batchSendCourseAllDTO.getImMsgSendDetailList());
+            if (batchSendCourseAllDTO == null) {
+                logger.error("batchSendCourseAllDTO 为 null,key: {}", key);
+                redisTemplate.opsForHash().delete(redisKey, key);
+                continue; // 跳过当前循环
+            }
+            OpenImBatchMsgDTO openImBatchMsgDTO = batchSendCourseAllDTO.getOpenImBatchMsgDTO();
+            Integer nowCount=openImBatchMsgDTO.getCount();
+            if (nowCount == null) {
+                nowCount = 0;
+            }
+            OpenImResponseDTO responseDTO=new OpenImResponseDTO();
+            try {
+                 responseDTO=  openIMService.batchSendCourseTask(batchSendCourseAllDTO.getBatchSendCourseDTO(), batchSendCourseAllDTO.getOpenImBatchMsgDTO(), batchSendCourseAllDTO.getProject(), batchSendCourseAllDTO.getImMsgSendDetailList());
+            } catch (Exception e) {
+                responseDTO.setErrCode(500);
+                responseDTO.setErrMsg(e.getMessage());
+                e.printStackTrace();
+            }
 
-            // 执行结束,删除
-            this.redisTemplate.<String, BatchSendCourseAllDTO>opsForHash().delete(redisKey, entry.getKey());
 
+            if(nowCount>1){ // 重试一次后放弃
+                logger.error("im会员定时发课重试三次后放弃,key{}", key);
+                redisTemplate.opsForHash().delete(redisKey, key);
+                continue;
+            }
+
+            if(responseDTO!=null && responseDTO.getErrCode() == 0){
+                // 执行结束,删除
+                redisTemplate.opsForHash().delete(redisKey, key);
+            }else {
+                openImBatchMsgDTO.setCount(nowCount + 1);// 次数加一
+                batchSendCourseAllDTO.setOpenImBatchMsgDTO(openImBatchMsgDTO);
+                // 错误更新次数 重新放入redis中
+                redisTemplate.opsForHash().put(redisKey, key, batchSendCourseAllDTO);
+            }
         }
 
     }

+ 6 - 1
fs-company-app/src/main/java/com/fs/app/controller/FsUserCourseVideoController.java

@@ -315,7 +315,12 @@ public class FsUserCourseVideoController extends AppBaseController {
         if(batchSendCourseDTO.getIsUrgeCourse()==null){
             batchSendCourseDTO.setIsUrgeCourse(false);
         }
-        return openIMService.batchSendCourse(batchSendCourseDTO);
+        // 异步调用
+        openIMService.batchSendCourse(batchSendCourseDTO);
+        OpenImResponseDTO openImResponseDTO = new OpenImResponseDTO();
+        openImResponseDTO.setErrCode(0);
+        openImResponseDTO.setErrMsg("异步发送,详细请看明细");
+        return openImResponseDTO;
     }
 
     @ApiOperation("会员一键催课")

+ 3 - 0
fs-service/src/main/java/com/fs/im/dto/OpenImBatchMsgDTO.java

@@ -29,6 +29,9 @@ public class OpenImBatchMsgDTO implements Serializable {
     private String ex;
     private Boolean isSendAll; //是否发送给全部人
 
+    // 发送失败次数 重试三次 失败后不再发送
+    private Integer count = 0;
+
 
     @Data
     public static class Content implements Serializable {

+ 115 - 45
fs-service/src/main/java/com/fs/im/service/impl/OpenIMServiceImpl.java

@@ -1098,12 +1098,19 @@ public class OpenIMServiceImpl implements OpenIMService {
                 .execute()
                 .body();
         log.info("批量发送消息返回内容:\n{}", result);
-        OpenImResponseDTO responseDTO= JSONUtil.toBean(result, OpenImResponseDTO.class);
+        OpenImResponseDTO responseDTO=new OpenImResponseDTO();
+        if (result != null && result.startsWith("<html")) {
+            log.error("网关超时返回HTML报文: {}", result);
+            responseDTO.setErrCode(502);
+            responseDTO.setErrMsg("网关超时");
+            return responseDTO;
+        }
+        responseDTO= JSONUtil.toBean(result, OpenImResponseDTO.class);
         return responseDTO;
     }
 
     @Override
-    @Transactional
+    @Async
     public OpenImResponseDTO batchSendCourse(BatchSendCourseDTO batchSendCourseDTO) throws JsonProcessingException {
         ObjectMapper objectMapper = new ObjectMapper();
         objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // 忽略null字段
@@ -1128,7 +1135,6 @@ public class OpenIMServiceImpl implements OpenIMService {
         }
 
         String courseUrl = fsUserCourse != null ? fsUserCourse.getImgUrl() : null;
-        OpenImBatchMsgDTO openImBatchMsgDTO = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, userIds, planSendTimeStamp, "发课");
 
         int sendType;
         if(ObjectUtils.isNotEmpty(batchSendCourseDTO.getSendType())&&batchSendCourseDTO.getSendType() == 1 && batchSendCourseDTO.getSendTime() != null && batchSendCourseDTO.getSendTime().compareTo(new Date()) > 0) {
@@ -1137,53 +1143,117 @@ public class OpenIMServiceImpl implements OpenIMService {
             sendType = 2; //实时
         }
 
-        // 创建消息发送记录
-        String sendUnionId = UUID.randomUUID().toString();
-        List<FsImMsgSendDetail> imMsgSendDetailList = createImMsgSendLog("发课",batchSendCourseDTO, planSendTimeStamp, sendType, userIds, sendUnionId);
+        // 批量发送,每次最多 100 人
+        int BATCH_SIZE = 100;
+        OpenImResponseDTO finalResponseDTO = new OpenImResponseDTO();
+        int totalSent = 0;
+        int successCount = 0;
+        int failCount = 0;
 
-        OpenImResponseDTO openImResponseDTO = new OpenImResponseDTO();
-        if(sendType == 1) {
-            // 定时发送
-            // 缓存定时发课消息
-            String redisKey = "openIm:batchSendMsg:sendCourse";
-            Map<String, Object> redisMap = new HashMap<>();
-            BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
-            batchSendCourseAllDTO.setBatchSendCourseDTO(batchSendCourseDTO).setOpenImBatchMsgDTO(openImBatchMsgDTO).setProject(project)
-                    .setImMsgSendDetailList(imMsgSendDetailList);
-            redisMap.put(batchSendCourseDTO.getCourseId()+":"+batchSendCourseDTO.getVideoId()+":"+batchSendCourseDTO.getSendTime().getTime()
-                    , batchSendCourseAllDTO);
-            redisCache.setCacheMap(redisKey, redisMap);
-            openImResponseDTO.setErrCode(0);
-            openImResponseDTO.setErrMsg("计划发送创建成功,待消息发送");
-        } else {
-            // 实时发送
-            openImResponseDTO = this.batchSendCourseTask(batchSendCourseDTO, openImBatchMsgDTO, project, imMsgSendDetailList);
-            openImResponseDTO.setErrMsg("实时发送成功");
-        }
+        // 分批处理
+        for (int i = 0; i < userIds.size(); i += BATCH_SIZE) {
+            int end = Math.min(i + BATCH_SIZE, userIds.size());
+            List<String> batchUserIds = userIds.subList(i, end);
+
+            try {
+                // 为每个批次创建独立的消息和记录
+                OpenImBatchMsgDTO openImBatchMsgDTO = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, batchUserIds, planSendTimeStamp, "发课");
+
+                // 创建消息发送记录(每个批次独立的 logId)
+                String sendUnionId = UUID.randomUUID().toString();
+                List<FsImMsgSendDetail> imMsgSendDetailList = createImMsgSendLog("发课", batchSendCourseDTO, planSendTimeStamp, sendType, batchUserIds, sendUnionId);
+
+                OpenImResponseDTO responseDTO;
+                if(sendType == 1) {
+                    // 定时发送 - 每个批次独立缓存
+                    String redisKey = "openIm:batchSendMsg:sendCourse";
+                    Map<String, Object> redisMap = redisCache.getCacheMap(redisKey);
+                    if (redisMap == null) {
+                        redisMap = new HashMap<>();
+                    }
 
-        //是否催课
-        if(batchSendCourseDTO.getIsUrgeCourse()){
-            // 组装催课消息数据
-            OpenImBatchMsgDTO openImBatchUrgeCourse = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, userIds, planSendTimeStamp, "催课");
+                    BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
+                    batchSendCourseAllDTO.setBatchSendCourseDTO(batchSendCourseDTO)
+                            .setOpenImBatchMsgDTO(openImBatchMsgDTO)
+                            .setProject(project)
+                            .setImMsgSendDetailList(imMsgSendDetailList);
+                    // 使用唯一的 key:课程 ID+ 视频 ID+ 时间戳  +logId
+                    String batchKey = batchSendCourseDTO.getCourseId() + ":" +
+                            batchSendCourseDTO.getVideoId() + ":" +
+                            batchSendCourseDTO.getSendTime().getTime() + ":" +
+                            imMsgSendDetailList.get(0).getLogId();
+
+                    redisMap.put(batchKey, batchSendCourseAllDTO);
+                    redisCache.setCacheMap(redisKey, redisMap);
+                    responseDTO = new OpenImResponseDTO();
+                    responseDTO.setErrCode(0);
+                    responseDTO.setErrMsg("计划发送创建成功,待消息发送");
+                    totalSent += batchUserIds.size();
+                    successCount += batchUserIds.size();
 
-            //缓存定时催课消息
-            int urgSendType;
-            if(batchSendCourseDTO.getUrgeTime() != null && batchSendCourseDTO.getUrgeTime().compareTo(new Date()) > 0) {
-                urgSendType = 1; //定时
-            } else {
-                urgSendType = 2; //实时
+                } else {
+                    // 实时发送 - 立即执行
+                    responseDTO = this.batchSendCourseTask(batchSendCourseDTO, openImBatchMsgDTO, project, imMsgSendDetailList);
+                    totalSent += batchUserIds.size();
+                    if (responseDTO != null && responseDTO.getErrCode() == 0) {
+                        successCount += batchUserIds.size();
+                    } else {
+                        failCount += batchUserIds.size();
+                        log.error("批次 {}/{} 发送失败:{}", i, end, responseDTO != null ? responseDTO.getErrMsg() : "unknown error");
+                    }
+
+                    // 每批之间休眠 100ms,避免请求过快
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                }
+
+                // 保存最后一个批次的响应
+                finalResponseDTO = responseDTO;
+
+                //是否催课
+                if(batchSendCourseDTO.getIsUrgeCourse()){
+                    // 组装催课消息数据
+                    OpenImBatchMsgDTO openImBatchUrgeCourse = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, batchUserIds, planSendTimeStamp, "催课");
+
+                    // 催课使用与发课相同的批次(batchUserIds),不需要再次拆分
+                    List<FsImMsgSendDetail> imMsgSendDetailUrgeList = createImMsgSendLog("催课", batchSendCourseDTO, planSendTimeStamp, 1, batchUserIds, sendUnionId);
+
+                    // 定时催课 - 缓存到 Redis
+                    String redisKey = "openIm:batchSendMsg:urgeCourse";
+                    Map<String, Object> redisMap = redisCache.getCacheMap(redisKey);
+                    if (redisMap == null) {
+                        redisMap = new HashMap<>();
+                    }
+
+                    BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
+                    batchSendCourseAllDTO.setOpenImBatchMsgDTO(openImBatchUrgeCourse)
+                            .setImMsgSendDetailList(imMsgSendDetailUrgeList);
+                    // 使用唯一的 key:课程 ID+ 视频 ID+ 时间戳  +logId
+                    String batchKey = batchSendCourseDTO.getCourseId() + ":" +
+                            batchSendCourseDTO.getVideoId() + ":" +
+                            batchSendCourseDTO.getSendTime().getTime() + ":" +
+                            imMsgSendDetailList.get(0).getLogId();
+
+                    redisMap.put(batchKey, batchSendCourseAllDTO);
+                    redisCache.setCacheMap(redisKey, redisMap);
+
+                }
+
+            } catch (Exception e) {
+                failCount += batchUserIds.size();
+                log.error("批次 {}/{} 发送异常:{}", i, end, e.getMessage(), e);
             }
-            List<FsImMsgSendDetail> imMsgSendDetailUrgeList = createImMsgSendLog("催课", batchSendCourseDTO, planSendTimeStamp, urgSendType, userIds, sendUnionId);
-            String redisKey = "openIm:batchSendMsg:urgeCourse";
-            Map<String, Object> redisMap = new HashMap<>();
-            BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
-            batchSendCourseAllDTO.setOpenImBatchMsgDTO(openImBatchUrgeCourse)
-                    .setImMsgSendDetailList(imMsgSendDetailUrgeList);
-            redisMap.put(batchSendCourseDTO.getCourseId()+":"+batchSendCourseDTO.getVideoId()+":"+batchSendCourseDTO.getUrgeTime().getTime()
-                    , batchSendCourseAllDTO);
-            redisCache.setCacheMap(redisKey, redisMap);
         }
-        return openImResponseDTO;
+        // 设置汇总结果
+        if (sendType == 2) { // 只有实时发送需要返回统计
+            finalResponseDTO.setErrMsg(String.format("发送完成:总数=%d, 成功=%d, 失败=%d", totalSent, successCount, failCount));
+        }
+
+        return finalResponseDTO;
     }
 
     private OpenImBatchMsgDTO makeOpenImBatchMsgDTO(BatchSendCourseDTO batchSendCourseDTO, String courseUrl, ObjectMapper objectMapper, List<String> userIds, long planSendTimeStamp, String logType) throws JsonProcessingException {