Просмотр исходного кода

1、同步员工用户业务逻辑调整

yys 2 месяцев назад
Родитель
Сommit
ccb53b70f1

+ 164 - 29
fs-qw-api/src/main/java/com/fs/app/service/impl/OpenQwApiServiceImpl.java

@@ -19,7 +19,9 @@ import com.fs.qw.param.QwExternalContactAddTagParam;
 import com.fs.qw.param.QwExternalContactUpdateNoteParam;
 import com.fs.qw.service.IQwCompanyService;
 import com.fs.qw.vo.QwSopRuleTimeVO;
+import com.fs.qwApi.Result.DeptUserResult;
 import com.fs.qwApi.Result.QwOpenidResult;
+import com.fs.qwApi.Result.UserResult;
 import com.fs.qwApi.domain.QwDeptResult;
 import com.fs.qwApi.domain.QwExternalContactRemarkResult;
 import com.fs.qwApi.domain.QwResult;
@@ -39,11 +41,13 @@ import com.fs.sop.params.SopUserLogsInfoDelParam;
 import com.fs.sop.params.SopUserLogsParamByDate;
 import com.fs.sop.service.ISopUserLogsService;
 import com.fs.voice.utils.StringUtil;
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.MDC;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDate;
@@ -54,7 +58,9 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @Slf4j
 @Service
@@ -83,40 +89,169 @@ public class OpenQwApiServiceImpl implements OpenQwApiService {
 
     @Override
     public R getSyncQwUser(Long tenantId, String corpId) {
-        return tenantDataSourceUtil.executeWithResult(tenantId, () -> {
-            QwUserIdResult userList = qwApiService.getUserList(corpId);
-            List<DeptUser> deptUser = userList.getDept_user();
-            log.info("返回数据:{}", JSON.toJSONString(userList));
-            log.info("同步用户数量:{}", deptUser.size());
-            QwCompany qwCompany = iQwCompanyService.selectQwCompanyByCorpId(corpId);
-            String accessToken = qwApiService.getToken(corpId, qwCompany.getPermanentCode());
-            for (DeptUser user : deptUser) {
-                QwUser qw = qwUserMapper.selectQwUserByCorpIdAndUserId(corpId, user.getUserid());
-                String serverQwUserName = qwApiService.getServerQwUserName(corpId, qwCompany.getOpenSecret(), user.getUserid(), qwCompany.getPermanentCode());
-                log.info("同步用户名称:{}", serverQwUserName);
-                QwUser qwUser = new QwUser();
-                qwUser.setQwUserId(user.getUserid());
-                qwUser.setDepartment(user.getDepartment().toString());
-                qwUser.setQwUserName(serverQwUserName);
-                qwUser.setCorpId(corpId);
-                QwOpenidByUserParams param = new QwOpenidByUserParams();
-                param.setUserid(user.getUserid());
-                QwOpenidResult qwOpenidResult = qwApiService.useridToOpenid(param, corpId);
-                qwUser.setOpenid(qwOpenidResult.getOpenid());
-
-                qwUser.setQwOpenUserId(qwApiService.getOpenUserid(accessToken, user.getUserid(), corpId));
-                if (qw == null) {
-                    qwUserMapper.insertQwUser(qwUser);
-                } else {
-                    qwUser.setId(qw.getId());
-                    qwUser.setIsDel(0);
-                    qwUserMapper.updateQwUser(qwUser);
+        String key = "qw:sync:" + corpId;
+
+        // 检查是否正在同步
+        if (redisCache.hasKey(key)) {
+            return R.error("同步任务正在执行中,请稍后再试");
+        }
+
+        // 设置锁,防止重复(使用你的方法签名)
+        boolean locked = redisCache.setIfAbsent(key, String.valueOf(System.currentTimeMillis()), 300, TimeUnit.SECONDS);
+        if (!locked) {
+            return R.error("同步任务正在执行中,请稍后再试");
+        }
+
+        // 异步执行同步任务
+        asyncExecuteSync(tenantId, corpId, key);
+
+        // 立即返回
+        return R.ok("同步任务已启动,请稍后查看结果");
+    }
+
+    /**
+     * 异步执行同步逻辑
+     */
+    @Async
+    public void asyncExecuteSync(Long tenantId, String corpId, String lockKey) {
+        try {
+            executeSync(tenantId, corpId);
+        } finally {
+            // 同步完成后删除锁
+            redisCache.deleteObject(lockKey);
+        }
+    }
+
+    /**
+     * 实际的同步逻辑
+     */
+    private void executeSync(Long tenantId, String corpId) {
+        tenantDataSourceUtil.executeWithResult(tenantId, () -> {
+            try {
+                // 1. 获取部门列表
+                QwDeptResult userList = qwApiService.getDepartmentList(corpId);
+                List<Department> deptUser = userList.getDepartment();
+                log.info("同步部门数量:{}", deptUser.size());
+
+                // 2. 获取企业信息和token
+                QwCompany qwCompany = iQwCompanyService.selectQwCompanyByCorpId(corpId);
+                String accessToken = qwApiService.getToken(corpId, qwCompany.getPermanentCode());
+
+                // 3. 批量获取所有部门用户
+                Map<String, DeptUserResult> userMap = deptUser.parallelStream()
+                        .flatMap(department -> {
+                            try {
+                                log.info("正在获取部门 {} 的用户列表", department.getId());
+                                UserResult userResult = qwApiService.getUserSimpleList(corpId, accessToken, department.getId());
+                                List<DeptUserResult> deptUserResults = userResult.getUserlist();
+                                if (deptUserResults == null || deptUserResults.isEmpty()) {
+                                    log.warn("部门 {} 没有用户", department.getId());
+                                    return Stream.empty();
+                                }
+                                return deptUserResults.stream();
+                            } catch (Exception e) {
+                                log.error("获取部门 {} 用户失败", department.getId(), e);
+                                return Stream.empty();
+                            }
+                        })
+                        .collect(Collectors.toMap(
+                                DeptUserResult::getUserid,
+                                Function.identity(),
+                                (existing, replacement) -> existing
+                        ));
+
+                log.info("去重后用户总数:{}", userMap.size());
+                if (userMap.isEmpty()) {
+                    log.info("无用户需要同步");
+                    return R.ok("无用户需要同步");
                 }
+
+                // 4. 批量查询现有用户
+                List<String> userIds = new ArrayList<>(userMap.keySet());
+                List<QwUser> existingUsers = qwUserMapper.selectQwUsersByCorpIdAndUserIds(corpId, userIds);
+                Map<String, QwUser> existingUserMap = existingUsers.stream()
+                        .collect(Collectors.toMap(QwUser::getQwOpenUserId, Function.identity()));
+
+                // 5. 批量获取用户详细信息
+                List<QwUser> usersToProcess = new ArrayList<>();
+                List<List<String>> batches = Lists.partition(userIds, 100);
+
+                for (List<String> batch : batches) {
+                    List<QwUser> batchUsers = batch.parallelStream()
+                            .map(userId -> {
+                                try {
+                                    DeptUserResult user = userMap.get(userId);
+                                    QwUser existingQwUser = existingUserMap.get(userId);
+
+                                    // 判断是否需要更新
+                                    if (existingQwUser != null && !needUpdateUserInfo(existingQwUser)) {
+                                        return null;
+                                    }
+
+                                    // 转换openid
+                                    QwOpenidByUserParams param = new QwOpenidByUserParams();
+                                    param.setUserid(userId);
+                                    QwOpenidResult qwOpenidResult = qwApiService.useridToOpenid(param, corpId);
+
+                                    QwUser qwUser = new QwUser();
+                                    qwUser.setDepartment(user.getDepartment().toString());
+                                    qwUser.setQwUserName(user.getName());
+                                    qwUser.setCorpId(corpId);
+                                    qwUser.setOpenid(qwOpenidResult.getOpenid());
+                                    qwUser.setQwOpenUserId(userId);
+
+                                    if (existingQwUser != null) {
+                                        qwUser.setId(existingQwUser.getId());
+                                        qwUser.setIsDel(0);
+                                    }
+
+                                    return qwUser;
+                                } catch (Exception e) {
+                                    log.error("处理用户失败,userId: {}", userId, e);
+                                    return null;
+                                }
+                            })
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+
+                    usersToProcess.addAll(batchUsers);
+                }
+
+                // 6. 批量数据库操作
+                List<QwUser> toInsert = usersToProcess.stream()
+                        .filter(u -> u.getId() == null)
+                        .collect(Collectors.toList());
+                List<QwUser> toUpdate = usersToProcess.stream()
+                        .filter(u -> u.getId() != null)
+                        .collect(Collectors.toList());
+
+                int successCount = 0;
+                if (!toInsert.isEmpty()) {
+                    successCount += qwUserMapper.batchInsertQwUser(toInsert);
+                    log.info("批量新增用户:{}", toInsert.size());
+                }
+                if (!toUpdate.isEmpty()) {
+                    successCount += qwUserMapper.batchUpdateQwUser(toUpdate);
+                    log.info("批量更新用户:{}", toUpdate.size());
+                }
+
+                log.info("同步完成,成功:{},失败:{}", successCount, usersToProcess.size() - successCount);
+                return R.ok(String.format("同步完成,成功%d个,失败%d个", successCount, usersToProcess.size() - successCount));
+
+            } catch (Exception e) {
+                log.error("同步用户过程异常", e);
+                return R.error("同步失败:" + e.getMessage());
             }
-            return R.ok();
         });
     }
 
+    /**
+     * 判断是否需要更新用户信息
+     */
+    private boolean needUpdateUserInfo(QwUser existingUser) {
+        return existingUser.getUpdateTime() == null ||
+                System.currentTimeMillis() - existingUser.getUpdateTime().getTime() > 24 * 60 * 60 * 1000;
+    }
     @Override
     public R getSyncQwDept(Long tenantId, String corpId) {
         return tenantDataSourceUtil.executeWithResult(tenantId, () -> {

+ 11 - 0
fs-service/src/main/java/com/fs/qw/mapper/QwUserMapper.java

@@ -510,4 +510,15 @@ public interface QwUserMapper extends BaseMapper<QwUser>
 
     @Select("select * from qw_user where ipad_status = 1 and corp_id=#{corpId} and qw_user_id=#{qwUserId} limit 1 ")
     QwUser selectQwUserAppKeyAndIdByCorpIdAndUserIdAndIpad(@Param("corpId")String corpId,@Param("qwUserId") String qwUserId);
+
+    // 批量查询
+    List<QwUser> selectQwUsersByCorpIdAndUserIds(@Param("corpId") String corpId,
+                                                 @Param("userIds") List<String> userIds);
+
+    // 批量插入
+    int batchInsertQwUser(@Param("list") List<QwUser> users);
+
+    // 批量更新
+    int batchUpdateQwUser(@Param("list") List<QwUser> users);
+
 }

+ 16 - 0
fs-service/src/main/java/com/fs/qwApi/Result/DeptUserResult.java

@@ -0,0 +1,16 @@
+package com.fs.qwApi.Result;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class DeptUserResult {
+    private String userid;
+    private String name;
+    private List<Integer> department;
+
+    @JsonProperty("open_userid")
+    private String openUserid;
+}

+ 13 - 0
fs-service/src/main/java/com/fs/qwApi/Result/UserResult.java

@@ -0,0 +1,13 @@
+package com.fs.qwApi.Result;
+
+import lombok.Data;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+@Data
+public class UserResult {
+    private Integer errcode;
+    private String errmsg;
+    private List<DeptUserResult> userlist;
+}
+

+ 5 - 0
fs-service/src/main/java/com/fs/qwApi/config/QwApiConfig.java

@@ -341,4 +341,9 @@ public interface QwApiConfig {
     String useridToOpenUserid="https://qyapi.weixin.qq.com/cgi-bin/batch/userid_to_openuserid";
 
     String getNewExternalUserid="https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get_new_external_userid";
+
+    /**
+     * 获取部门成员
+     */
+    String getUserSimpleList="https://qyapi.weixin.qq.com/cgi-bin/user/simplelist";
 }

+ 9 - 4
fs-service/src/main/java/com/fs/qwApi/service/QwApiService.java

@@ -358,8 +358,13 @@ public interface QwApiService {
 
     QwLinkCustomerResult linkCustomer(QwLinkCustomerParam param,String corpId);
 
-
-
-
-
+    /**
+     * 获取部门成员
+     *
+     * @param corpId
+     * @param accessToken
+     * @param id
+     * @return
+     */
+    UserResult getUserSimpleList(String corpId, String accessToken, Integer id);
 }

+ 42 - 0
fs-service/src/main/java/com/fs/qwApi/service/impl/QwApiServiceImpl.java

@@ -1679,6 +1679,48 @@ public class QwApiServiceImpl implements QwApiService {
         return null;
     }
 
+
+    /**
+     * 获取部门成员
+     * @param corpId
+     * @return
+     */
+    @Override
+    public UserResult getUserSimpleList(String corpId, String accessToken, Integer departmentId) {
+        HttpClient httpClient = HttpClients.createDefault();
+        try {
+            // 企业微信API:获取部门成员
+            URIBuilder builder = new URIBuilder(QwApiConfig.getUserSimpleList);
+            builder.setParameter("access_token", accessToken);
+            builder.setParameter("department_id", departmentId.toString());
+            builder.setParameter("fetch_child", "0"); // 0-不获取子部门,1-获取
+
+            URI uri = builder.build();
+            HttpGet httpGet = new HttpGet(uri);
+
+            // 设置请求头
+            httpGet.setHeader("Content-Type", "application/json");
+
+            HttpResponse response = httpClient.execute(httpGet);
+            String reJson = EntityUtils.toString(response.getEntity());
+            log.info("获取部门{}成员返回:{}", departmentId, reJson);
+
+            UserResult qwResult = JSON.parseObject(reJson, UserResult.class);
+
+            // 检查返回码
+            if (qwResult.getErrcode() != 0) {
+                log.error("获取部门成员失败,errcode: {}, errmsg: {}",
+                        qwResult.getErrcode(), qwResult.getErrmsg());
+            }
+
+            return qwResult;
+        } catch (Exception e) {
+            log.error("获取部门成员异常,departmentId: {}", departmentId, e);
+        }
+        return null;
+    }
+
+
     @Override
     public QwDeptResult getDepartmentList(String corpId) {
 ////        CompanyConfig companyConfig = companyConfigService.selectCompanyConfigByKey(corpId, "sys:qw:config");

+ 1 - 1
fs-service/src/main/resources/application-dev.yml

@@ -124,4 +124,4 @@ spring:
                         merge-sql: true
                     wall:
                         config:
-                            multi-statement-allow: true
+                            multi-statement-allow: true

+ 64 - 0
fs-service/src/main/resources/mapper/qw/QwUserMapper.xml

@@ -332,5 +332,69 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
              #{serverId}
          </foreach>
     </select>
+    <!-- QwUserMapper.xml -->
+    <!-- 批量查询 -->
+    <select id="selectQwUsersByCorpIdAndUserIds" resultType="QwUser">
+        SELECT * FROM qw_user
+        WHERE corp_id = #{corpId}
+        AND qw_open_user_id IN
+        <foreach collection="userIds" item="userId" open="(" separator="," close=")">
+            #{userId}
+        </foreach>
+    </select>
 
+    <!-- 批量插入 - 补充完整字段 -->
+    <insert id="batchInsertQwUser">
+        INSERT INTO qw_user (
+        department, qw_user_name, corp_id, openid, qw_open_user_id,
+        qw_user_id, company_id, company_user_id, status, welcome_text,
+        welcome_image, is_send_msg, app_key, contact_way, config_id,
+        qw_hook_id, fastGpt_role_id, login_status, tool_status,
+        login_code_url, version, vid, uid, ipad_status, server_id,
+        server_status, is_auto, video_get_status, ai_status,
+        is_del, create_time
+        ) VALUES
+        <foreach collection="list" item="user" separator=",">
+            (
+            #{user.department}, #{user.qwUserName}, #{user.corpId}, #{user.openid}, #{user.qwOpenUserId},
+            #{user.qwUserId}, #{user.companyId}, #{user.companyUserId}, #{user.status}, #{user.welcomeText},
+            #{user.welcomeImage}, #{user.isSendMsg}, #{user.appKey}, #{user.contactWay}, #{user.configId},
+            #{user.qwHookId}, #{user.fastGptRoleId}, #{user.loginStatus}, #{user.toolStatus},
+            #{user.loginCodeUrl}, #{user.version}, #{user.vid}, #{user.uid}, #{user.ipadStatus}, #{user.serverId},
+            #{user.serverStatus}, #{user.isAuto}, #{user.videoGetStatus}, #{user.aiStatus},
+            0, NOW()
+            )
+        </foreach>
+    </insert>
+
+    <!-- 批量更新 - 使用 CASE WHEN 方式,更高效 -->
+    <update id="batchUpdateQwUser">
+        UPDATE qw_user
+        SET
+        department = CASE id
+        <foreach collection="list" item="user">
+            WHEN #{user.id} THEN #{user.department}
+        </foreach>
+        END,
+        qw_user_name = CASE id
+        <foreach collection="list" item="user">
+            WHEN #{user.id} THEN #{user.qwUserName}
+        </foreach>
+        END,
+        openid = CASE id
+        <foreach collection="list" item="user">
+            WHEN #{user.id} THEN #{user.openid}
+        </foreach>
+        END,
+        is_del = CASE id
+        <foreach collection="list" item="user">
+            WHEN #{user.id} THEN #{user.isDel}
+        </foreach>
+        END,
+        update_time = NOW()
+        WHERE id IN
+        <foreach collection="list" item="user" open="(" close=")" separator=",">
+            #{user.id}
+        </foreach>
+    </update>
 </mapper>