Selaa lähdekoodia

转发处理+并行(8个节点)处理企微上传图片

xw 5 päivää sitten
vanhempi
commit
a415a48dca

+ 14 - 0
fs-qw-company-api/src/main/java/com/fs/app/controller/QwController.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.fs.app.exception.FSException;
 import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.utils.StringUtils;
+import com.fs.course.service.IFsUserCourseService;
 import com.fs.qw.domain.QwCompany;
 import com.fs.qw.service.IQwCompanyService;
 import com.fs.qwApi.Params.QwApiParam;
@@ -33,6 +34,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -43,6 +45,7 @@ public class QwController {
 
     private final IQwCompanyService qwCompanyService;
     private final RedisCacheT<String> redisCache;
+    private final IFsUserCourseService fsUserCourseService;
 
     @PostMapping("/post")
     public QwApiResult post(@RequestBody QwApiParam param) throws Exception {
@@ -98,6 +101,17 @@ public class QwController {
         return QwApiResult.ok(reJson);
     }
 
+
+    @PostMapping("/timer/processQwSopCourseMaterial")
+    public QwApiResult processQwSopCourseMaterialTimer(@RequestBody List<String> corpIds) {
+        if (corpIds == null || corpIds.isEmpty()) {
+            return QwApiResult.error("corpIds 为空");
+        }
+        log.info("定时任务-企微上传课程图片:本转发节点处理主体数 {}", corpIds.size());
+        fsUserCourseService.processQwSopCourseMaterialForCorpIds(corpIds);
+        return QwApiResult.ok("ok");
+    }
+
     @PostMapping("/uploadImg")
     public QwApiResult uploadImg(@RequestBody QwApiParam param) throws Exception {
         String uuid = UUID.randomUUID().toString();

+ 5 - 0
fs-service/src/main/java/com/fs/course/service/IFsUserCourseService.java

@@ -110,6 +110,11 @@ public interface IFsUserCourseService {
 
     void processQwSopCourseMaterialTimer();
 
+    /**
+     * 仅处理指定 corpId 列表的课程封面上传企微(供分布式定时任务节点调用)
+     */
+    void processQwSopCourseMaterialForCorpIds(List<String> corpIds);
+
     List<FsCourseListBySidebarVO> getFsCourseListBySidebar(FsCourseListBySidebarParam param);
 
     List<FsCourseListBySidebarVO> getFsCourseListBySidebarToday(FsCourseListBySidebarParam param);

+ 137 - 14
fs-service/src/main/java/com/fs/course/service/impl/FsUserCourseServiceImpl.java

@@ -9,6 +9,10 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.util.*;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import cn.hutool.json.JSONUtil;
@@ -60,7 +64,12 @@ import org.checkerframework.checker.units.qual.A;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.SimpleClientHttpRequestFactory;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 import com.fs.course.service.IFsUserCourseService;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -76,6 +85,9 @@ import javax.imageio.ImageIO;
 @Slf4j
 public class FsUserCourseServiceImpl implements IFsUserCourseService
 {
+    /** 单节点内按 corp 并行上传时的最大线程数(过大易触发企微频控) */
+    private static final int QW_COURSE_MATERIAL_CORP_PARALLELISM = 8;
+
     @Autowired
     private CompanyTagMapper companyTagMapper;
     @Autowired
@@ -495,34 +507,145 @@ public class FsUserCourseServiceImpl implements IFsUserCourseService
 
     /**
      * 定时任务 - 处理企业微信SOP课程素材
-     * 每2天执行一次,将课程封面图片上传到企业微信素材库并缓存mediaId
+     * 按 {@link QwCompany#getQwApiUrl()} 分组:无转发地址的在当前机器执行;有转发地址的并行 HTTP
      */
     @Override
     public void processQwSopCourseMaterialTimer() {
-        // 获取所有需要处理的课程列表
-        List<FsUserCourse> fsUserCourses = fsUserCourseMapper.selectFsUserCourseAllCourseByQw();
-        // 获取所有企业微信配置
         List<QwCompany> companies = iQwCompanyService.selectQwCompanyList(new QwCompany());
-
-        // 遍历每个企业微信配置
+        Map<String, LinkedHashSet<String>> bucket = new LinkedHashMap<>();
         for (QwCompany company : companies) {
             String corpId = company.getCorpId();
-            if (corpId == null) {
+            if (corpId == null || StringUtils.isEmpty(corpId)) {
                 continue;
             }
+            String forwardKey = StringUtils.isNotEmpty(company.getQwApiUrl())
+                    ? normalizeQwForwardBaseUrl(company.getQwApiUrl())
+                    : "";
+            bucket.computeIfAbsent(forwardKey, k -> new LinkedHashSet<>()).add(corpId);
+        }
+        if (bucket.isEmpty()) {
+            log.warn("processQwSopCourseMaterialTimer: 无有效企微主体 corpId,跳过");
+            return;
+        }
+        int parallelTasks = (int) bucket.entrySet().stream().filter(e -> !e.getValue().isEmpty()).count();
+        int poolSize = Math.min(Math.max(parallelTasks, 1), 32);
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        try {
+            for (Map.Entry<String, LinkedHashSet<String>> e : bucket.entrySet()) {
+                List<String> corpIds = new ArrayList<>(e.getValue());
+                if (corpIds.isEmpty()) {
+                    continue;
+                }
+                String forwardBase = e.getKey();
+                if (forwardBase.isEmpty()) {
+                    futures.add(CompletableFuture.runAsync(
+                            () -> processQwSopCourseMaterialForCorpIds(corpIds), executor));
+                } else {
+                    String remoteBase = forwardBase;
+                    futures.add(CompletableFuture.runAsync(
+                            () -> invokeRemoteProcessQwSopCourseMaterial(remoteBase, corpIds), executor));
+                }
+            }
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+        } finally {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException ie) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 
-            // 遍历每个课程,上传图片到对应企业的素材库
-            for (FsUserCourse course : fsUserCourses) {
-                try {
-                    uploadCourseImage(course, corpId);
-                } catch (Exception e) {
-                    log.error("处理课程图片失败: courseId={}, corpId={}, error={}",
-                            course.getCourseId(), corpId, e.getMessage());
+    @Override
+    public void processQwSopCourseMaterialForCorpIds(List<String> corpIds) {
+        if (corpIds == null || corpIds.isEmpty()) {
+            return;
+        }
+        List<String> validCorpIds = corpIds.stream()
+                .filter(id -> !StringUtils.isEmpty(id))
+                .collect(Collectors.toList());
+        if (validCorpIds.isEmpty()) {
+            return;
+        }
+        List<FsUserCourse> fsUserCourses = fsUserCourseMapper.selectFsUserCourseAllCourseByQw();
+        if (fsUserCourses == null || fsUserCourses.isEmpty()) {
+            return;
+        }
+        int poolSize = Math.min(validCorpIds.size(), QW_COURSE_MATERIAL_CORP_PARALLELISM);
+        ExecutorService corpExecutor = Executors.newFixedThreadPool(poolSize);
+        List<CompletableFuture<Void>> corpFutures = new ArrayList<>();
+        try {
+            for (String corpId : validCorpIds) {
+                corpFutures.add(CompletableFuture.runAsync(
+                        () -> uploadAllCoursesForOneCorp(fsUserCourses, corpId), corpExecutor));
+            }
+            CompletableFuture.allOf(corpFutures.toArray(new CompletableFuture[0])).join();
+        } finally {
+            corpExecutor.shutdown();
+            try {
+                if (!corpExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    corpExecutor.shutdownNow();
                 }
+            } catch (InterruptedException ie) {
+                corpExecutor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * 单个主体:按课程顺序上传(同一 corp 内保持串行,避免企微侧无序竞态)
+     */
+    private void uploadAllCoursesForOneCorp(List<FsUserCourse> fsUserCourses, String corpId) {
+        for (FsUserCourse course : fsUserCourses) {
+            try {
+                uploadCourseImage(course, corpId);
+            } catch (Exception e) {
+                log.error("处理课程图片失败: courseId={}, corpId={}, error={}",
+                        course.getCourseId(), corpId, e.getMessage());
             }
         }
     }
 
+    /**
+     * 正规化转发地址
+     * @param qwApiUrl
+     * @return
+     */
+    private static String normalizeQwForwardBaseUrl(String qwApiUrl) {
+        String s = qwApiUrl.trim();
+        while (s.endsWith("/")) {
+            s = s.substring(0, s.length() - 1);
+        }
+        return s;
+    }
+
+    /**
+     * 调用部署在转发地址上的 fs-qw-company-api:/timer/processQwSopCourseMaterial
+     */
+    private void invokeRemoteProcessQwSopCourseMaterial(String forwardBaseUrl, List<String> corpIds) {
+        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
+        factory.setConnectTimeout(60_000);
+        factory.setReadTimeout(6 * 60 * 60 * 1000);
+        RestTemplate restTemplate = new RestTemplate(factory);
+        String url = forwardBaseUrl + "/timer/processQwSopCourseMaterial";
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        HttpEntity<String> entity = new HttpEntity<>(JSON.toJSONString(corpIds), headers);
+        try {
+            log.info("企微课程素材定时任务:HTTP 分发至 {},主体数 {}", url, corpIds.size());
+            restTemplate.postForEntity(url, entity, String.class);
+        } catch (Exception ex) {
+            log.error("企微课程素材定时任务:远程节点调用失败 url={}, corpCount={}, err={}",
+                    url, corpIds.size(), ex.getMessage(), ex);
+        }
+    }
+
     @Override
     public List<FsCourseListBySidebarVO> getFsCourseListBySidebar(FsCourseListBySidebarParam param) {
         return  fsUserCourseMapper.getFsCourseListBySidebar(param);

+ 2 - 1
fs-service/src/main/resources/mapper/qw/QwCompanyMapper.xml

@@ -95,7 +95,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             qc.share_app_id,
             qc.share_agent_id,
             qc.share_schema,
-            sc.`name` AS mini_name
+            sc.`name` AS mini_name,
+            qc.qw_api_url
         FROM
             qw_company qc
                 LEFT JOIN