Просмотр исходного кода

每次发送100条数据im 更新

xgb 3 недель назад
Родитель
Сommit
1cae6a9f44

+ 1 - 1
fs-admin/src/main/java/com/fs/his/task/Task.java

@@ -1705,7 +1705,7 @@ public class Task {
             }
 
 
-            if(nowCount>3){ // 重试三次后放弃
+            if(nowCount>1){ // 重试一次后放弃
                 logger.error("im会员定时发课重试三次后放弃,key{}", key);
                 redisTemplate.opsForHash().delete(redisKey, key);
                 continue;

+ 1 - 1
fs-company-app/src/main/java/com/fs/app/controller/FsUserCourseVideoController.java

@@ -349,7 +349,7 @@ public class FsUserCourseVideoController extends AppBaseController {
         if(batchSendCourseDTO.getIsUrgeCourse()==null){
             batchSendCourseDTO.setIsUrgeCourse(false);
         }
-        return openIMService.batchSendCourse(batchSendCourseDTO);
+        return openIMService.batchSendCourseLimit(batchSendCourseDTO);
     }
 
     @ApiOperation("会员一键催课")

+ 0 - 2
fs-service/src/main/java/com/fs/course/dto/BatchSendCourseDTO.java

@@ -82,6 +82,4 @@ public class BatchSendCourseDTO implements Serializable {
     // 看课链接主键
     private Long linkId;
 
-    // im发送主表记录id
-    private Long logId;
 }

+ 9 - 0
fs-service/src/main/java/com/fs/im/service/OpenIMService.java

@@ -56,6 +56,15 @@ public interface OpenIMService {
      */
     OpenImResponseDTO batchSendCourse(BatchSendCourseDTO batchSendCourseDTO) throws JsonProcessingException;
 
+    /**
+     * 会员批量发课
+     * @param batchSendCourseDTO 每次发送100人信息
+     * @return
+     * @throws JsonProcessingException
+     */
+    OpenImResponseDTO batchSendCourseLimit(BatchSendCourseDTO batchSendCourseDTO) throws JsonProcessingException;
+
+
     /**
      * 会员批量发课-任务
      * @param batchSendCourseDTO

+ 188 - 1
fs-service/src/main/java/com/fs/im/service/impl/OpenIMServiceImpl.java

@@ -1173,6 +1173,193 @@ public class OpenIMServiceImpl implements OpenIMService {
         return openImResponseDTO;
     }
 
+    @Override
+    public OpenImResponseDTO batchSendCourseLimit(BatchSendCourseDTO batchSendCourseDTO) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // 忽略 null 字段
+
+        //获取需要发送的人
+        List<String> userIds = this.getRecvIds(batchSendCourseDTO);
+
+        //注册和添加好友
+        for (String userId : userIds) {
+            String uId = userId.substring(1);
+            checkAndImportFriendByDianBo(batchSendCourseDTO.getCompanyUserId(), uId,null,false);
+        }
+
+        //组装发课消息数据
+        FsUserCourse fsUserCourse = fsUserCourseMapper.selectFsUserCourseByCourseId(batchSendCourseDTO.getCourseId());
+        Long project = fsUserCourse != null ? fsUserCourse.getProject() : null;
+        long planSendTimeStamp;
+        if(ObjectUtils.isNotEmpty(batchSendCourseDTO.getSendType())&&batchSendCourseDTO.getSendType() == 1 && batchSendCourseDTO.getSendTime() != null && batchSendCourseDTO.getSendTime().compareTo(new Date()) > 0){
+            planSendTimeStamp = batchSendCourseDTO.getSendTime().getTime();
+        } else {
+            planSendTimeStamp = System.currentTimeMillis();
+        }
+
+        String courseUrl = fsUserCourse != null ? fsUserCourse.getImgUrl() : null;
+
+        int sendType;
+        if(ObjectUtils.isNotEmpty(batchSendCourseDTO.getSendType())&&batchSendCourseDTO.getSendType() == 1 && batchSendCourseDTO.getSendTime() != null && batchSendCourseDTO.getSendTime().compareTo(new Date()) > 0) {
+            sendType = 1; //定时
+        } else {
+            sendType = 2; //实时
+        }
+
+        // 批量发送,每次最多 100 人
+        int BATCH_SIZE = 100;
+        OpenImResponseDTO finalResponseDTO = new OpenImResponseDTO();
+        int totalSent = 0;
+        int successCount = 0;
+        int failCount = 0;
+
+        // 分批处理
+        for (int i = 0; i < userIds.size(); i += BATCH_SIZE) {
+            int end = Math.min(i + BATCH_SIZE, userIds.size());
+            List<String> batchUserIds = userIds.subList(i, end);
+
+            try {
+                // 为每个批次创建独立的消息和记录
+                OpenImBatchMsgDTO openImBatchMsgDTO = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, batchUserIds, planSendTimeStamp, "发课");
+
+                // 创建消息发送记录(每个批次独立的 logId)
+                String sendUnionId = UUID.randomUUID().toString();
+                List<FsImMsgSendDetail> imMsgSendDetailList = createImMsgSendLog("发课", batchSendCourseDTO, planSendTimeStamp, sendType, batchUserIds, sendUnionId);
+
+                OpenImResponseDTO responseDTO;
+                if(sendType == 1) {
+                    // 定时发送 - 每个批次独立缓存
+                    String redisKey = "openIm:batchSendMsg:sendCourse";
+                    Map<String, Object> redisMap = redisCache.getCacheMap(redisKey);
+                    if (redisMap == null) {
+                        redisMap = new HashMap<>();
+                    }
+
+                    BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
+                    batchSendCourseAllDTO.setBatchSendCourseDTO(batchSendCourseDTO)
+                            .setOpenImBatchMsgDTO(openImBatchMsgDTO)
+                            .setProject(project)
+                            .setImMsgSendDetailList(imMsgSendDetailList);
+
+                    // 使用唯一的 key:课程 ID+ 视频 ID+ 时间戳 + 批次索引 +logId
+                    String batchKey = batchSendCourseDTO.getCourseId() + ":" +
+                            batchSendCourseDTO.getVideoId() + ":" +
+                            batchSendCourseDTO.getSendTime().getTime() + ":" +
+                            i + ":" +
+                            imMsgSendDetailList.get(0).getLogId();
+
+                    redisMap.put(batchKey, batchSendCourseAllDTO);
+                    redisCache.setCacheMap(redisKey, redisMap);
+
+                    responseDTO = new OpenImResponseDTO();
+                    responseDTO.setErrCode(0);
+                    responseDTO.setErrMsg("计划发送创建成功,待消息发送");
+                    totalSent += batchUserIds.size();
+                    successCount += batchUserIds.size();
+
+                } else {
+                    // 实时发送 - 立即执行
+                    responseDTO = this.batchSendCourseTask(batchSendCourseDTO, openImBatchMsgDTO, project, imMsgSendDetailList);
+                    totalSent += batchUserIds.size();
+                    if (responseDTO != null && responseDTO.getErrCode() == 0) {
+                        successCount += batchUserIds.size();
+                    } else {
+                        failCount += batchUserIds.size();
+                        log.error("批次 {}/{} 发送失败:{}", i, end, responseDTO != null ? responseDTO.getErrMsg() : "unknown error");
+                    }
+
+                    // 每批之间休眠 100ms,避免请求过快
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                }
+
+                // 保存最后一个批次的响应
+                finalResponseDTO = responseDTO;
+
+            } catch (Exception e) {
+                failCount += batchUserIds.size();
+                log.error("批次 {}/{} 发送异常:{}", i, end, e.getMessage(), e);
+            }
+        }
+
+        // 设置汇总结果
+        if (sendType == 2) { // 只有实时发送需要返回统计
+            finalResponseDTO.setErrMsg(String.format("发送完成:总数=%d, 成功=%d, 失败=%d", totalSent, successCount, failCount));
+        }
+
+        //是否催课
+        if(batchSendCourseDTO.getIsUrgeCourse()){
+            // 组装催课消息数据
+            OpenImBatchMsgDTO openImBatchUrgeCourse = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, userIds, planSendTimeStamp, "催课");
+
+            //缓存定时催课消息
+            int urgSendType;
+            if(batchSendCourseDTO.getUrgeTime() != null && batchSendCourseDTO.getUrgeTime().compareTo(new Date()) > 0) {
+                urgSendType = 1; //定时
+            } else {
+                urgSendType = 2; //实时
+            }
+
+            // 催课也按批次拆分
+            if (urgSendType == 1 && userIds.size() > BATCH_SIZE) {
+                log.info("定时催课消息接收人{}人,超过{}人,拆分缓存", userIds.size(), BATCH_SIZE);
+
+                for (int i = 0; i < userIds.size(); i += BATCH_SIZE) {
+                    int end = Math.min(i + BATCH_SIZE, userIds.size());
+                    List<String> batchUserIds = userIds.subList(i, end);
+
+                    OpenImBatchMsgDTO batchUrgeMsgDTO = makeOpenImBatchMsgDTO(batchSendCourseDTO, courseUrl, objectMapper, batchUserIds, planSendTimeStamp, "催课");
+                    List<FsImMsgSendDetail> imMsgSendDetailUrgeList = createImMsgSendLog("催课", batchSendCourseDTO, planSendTimeStamp, urgSendType, batchUserIds, UUID.randomUUID().toString());
+
+                    String redisKey = "openIm:batchSendMsg:urgeCourse";
+                    Map<String, Object> redisMap = redisCache.getCacheMap(redisKey);
+                    if (redisMap == null) {
+                        redisMap = new HashMap<>();
+                    }
+
+                    BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
+                    batchSendCourseAllDTO.setOpenImBatchMsgDTO(batchUrgeMsgDTO)
+                            .setImMsgSendDetailList(imMsgSendDetailUrgeList);
+
+                    // 使用唯一的 key
+                    String batchKey = batchSendCourseDTO.getCourseId() + ":" +
+                            batchSendCourseDTO.getVideoId() + ":" +
+                            batchSendCourseDTO.getUrgeTime().getTime() + ":" +
+                            i + ":" +
+                            imMsgSendDetailUrgeList.get(0).getLogId();
+
+                    redisMap.put(batchKey, batchSendCourseAllDTO);
+                    redisCache.setCacheMap(redisKey, redisMap);
+                }
+            } else {
+                // 不超过 100 人或实时发送,按原逻辑
+                List<FsImMsgSendDetail> imMsgSendDetailUrgeList = createImMsgSendLog("催课", batchSendCourseDTO, planSendTimeStamp, urgSendType, userIds, UUID.randomUUID().toString());
+                String redisKey = "openIm:batchSendMsg:urgeCourse";
+                Map<String, Object> redisMap = new HashMap<>();
+                BatchSendCourseAllDTO batchSendCourseAllDTO = new BatchSendCourseAllDTO();
+                batchSendCourseAllDTO.setOpenImBatchMsgDTO(openImBatchUrgeCourse)
+                        .setImMsgSendDetailList(imMsgSendDetailUrgeList);
+
+                String batchKey = batchSendCourseDTO.getCourseId()+":"+batchSendCourseDTO.getVideoId()+":"+batchSendCourseDTO.getUrgeTime().getTime();
+                if (urgSendType == 1) {
+                    batchKey += ":0:" + imMsgSendDetailUrgeList.get(0).getLogId();
+                }
+
+                redisMap.put(batchKey, batchSendCourseAllDTO);
+                redisCache.setCacheMap(redisKey, redisMap);
+            }
+        }
+
+        return finalResponseDTO;
+    }
+
+// ... existing code ...
+
+
     private OpenImBatchMsgDTO makeOpenImBatchMsgDTO(BatchSendCourseDTO batchSendCourseDTO, String courseUrl, ObjectMapper objectMapper, List<String> userIds, long planSendTimeStamp, String logType) throws JsonProcessingException {
          PayloadDTO.Extension extension = new PayloadDTO.Extension();
         OpenImBatchMsgDTO openImBatchMsgDTO = new OpenImBatchMsgDTO();
@@ -1444,7 +1631,7 @@ public class OpenIMServiceImpl implements OpenIMService {
             log.error("发送消息失败,结果:{}", openImResponseDTO);
             FsImMsgSendLog fsImMsgSendLog = new FsImMsgSendLog();
             fsImMsgSendLog.setLogId(imMsgSendDetailList.get(0).getLogId());
-            if(openImBatchMsgDTO.getCount()>3){
+            if(openImBatchMsgDTO.getCount()>1){
                 fsImMsgSendLog.setSendStatus(3);
             }
             fsImMsgSendLog.setUpdateTime(new Date());