|
|
@@ -19,7 +19,9 @@ import com.fs.qw.param.QwExternalContactAddTagParam;
|
|
|
import com.fs.qw.param.QwExternalContactUpdateNoteParam;
|
|
|
import com.fs.qw.service.IQwCompanyService;
|
|
|
import com.fs.qw.vo.QwSopRuleTimeVO;
|
|
|
+import com.fs.qwApi.Result.DeptUserResult;
|
|
|
import com.fs.qwApi.Result.QwOpenidResult;
|
|
|
+import com.fs.qwApi.Result.UserResult;
|
|
|
import com.fs.qwApi.domain.QwDeptResult;
|
|
|
import com.fs.qwApi.domain.QwExternalContactRemarkResult;
|
|
|
import com.fs.qwApi.domain.QwResult;
|
|
|
@@ -39,11 +41,13 @@ import com.fs.sop.params.SopUserLogsInfoDelParam;
|
|
|
import com.fs.sop.params.SopUserLogsParamByDate;
|
|
|
import com.fs.sop.service.ISopUserLogsService;
|
|
|
import com.fs.voice.utils.StringUtil;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.MDC;
|
|
|
import org.springframework.beans.BeanUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.time.LocalDate;
|
|
|
@@ -54,7 +58,9 @@ import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
@@ -81,42 +87,229 @@ public class OpenQwApiServiceImpl implements OpenQwApiService {
|
|
|
@Autowired
|
|
|
private TenantDataSourceUtil tenantDataSourceUtil;
|
|
|
|
|
|
+ /** 同步用户线程池 */
|
|
|
+ private static final ExecutorService SYNC_USER_EXECUTOR = new ThreadPoolExecutor(
|
|
|
+ 8, 16, 60L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(500),
|
|
|
+ new ThreadPoolExecutor.CallerRunsPolicy()
|
|
|
+ );
|
|
|
+
|
|
|
@Override
|
|
|
public R getSyncQwUser(Long tenantId, String corpId) {
|
|
|
- return tenantDataSourceUtil.executeWithResult(tenantId, () -> {
|
|
|
- QwUserIdResult userList = qwApiService.getUserList(corpId);
|
|
|
- List<DeptUser> deptUser = userList.getDept_user();
|
|
|
- log.info("返回数据:{}", JSON.toJSONString(userList));
|
|
|
- log.info("同步用户数量:{}", deptUser.size());
|
|
|
- QwCompany qwCompany = iQwCompanyService.selectQwCompanyByCorpId(corpId);
|
|
|
- String accessToken = qwApiService.getToken(corpId, qwCompany.getPermanentCode());
|
|
|
- for (DeptUser user : deptUser) {
|
|
|
- QwUser qw = qwUserMapper.selectQwUserByCorpIdAndUserId(corpId, user.getUserid());
|
|
|
- String serverQwUserName = qwApiService.getServerQwUserName(corpId, qwCompany.getOpenSecret(), user.getUserid(), qwCompany.getPermanentCode());
|
|
|
- log.info("同步用户名称:{}", serverQwUserName);
|
|
|
- QwUser qwUser = new QwUser();
|
|
|
- qwUser.setQwUserId(user.getUserid());
|
|
|
- qwUser.setDepartment(user.getDepartment().toString());
|
|
|
- qwUser.setQwUserName(serverQwUserName);
|
|
|
- qwUser.setCorpId(corpId);
|
|
|
- QwOpenidByUserParams param = new QwOpenidByUserParams();
|
|
|
- param.setUserid(user.getUserid());
|
|
|
- QwOpenidResult qwOpenidResult = qwApiService.useridToOpenid(param, corpId);
|
|
|
- qwUser.setOpenid(qwOpenidResult.getOpenid());
|
|
|
-
|
|
|
- qwUser.setQwOpenUserId(qwApiService.getOpenUserid(accessToken, user.getUserid(), corpId));
|
|
|
- if (qw == null) {
|
|
|
- qwUserMapper.insertQwUser(qwUser);
|
|
|
- } else {
|
|
|
- qwUser.setId(qw.getId());
|
|
|
- qwUser.setIsDel(0);
|
|
|
- qwUserMapper.updateQwUser(qwUser);
|
|
|
+ String key = "qw:sync:" + corpId;
|
|
|
+
|
|
|
+ if (redisCache.hasKey(key)) {
|
|
|
+ return R.error("同步任务正在执行中,请稍后再试");
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean locked = redisCache.setIfAbsent(key, String.valueOf(System.currentTimeMillis()), 300, TimeUnit.SECONDS);
|
|
|
+ if (!locked) {
|
|
|
+ return R.error("同步任务正在执行中,请稍后再试");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 异步执行同步任务(使用线程池代替@Async自调用失效问题)
|
|
|
+ SYNC_USER_EXECUTOR.submit(() -> {
|
|
|
+ try {
|
|
|
+ executeSync(tenantId, corpId);
|
|
|
+ } finally {
|
|
|
+ redisCache.deleteObject(key);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ return R.ok("同步任务已启动,请稍后查看结果");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 实际的同步逻辑
|
|
|
+ */
|
|
|
+ private void executeSync(Long tenantId, String corpId) {
|
|
|
+ tenantDataSourceUtil.executeWithResult(tenantId, () -> {
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ log.info("========== 开始同步用户数据 ========== 租户ID: {}, 企业ID: {}", tenantId, corpId);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 1. 获取部门列表
|
|
|
+ List<Department> departmentList = fetchDepartments(corpId);
|
|
|
+ if (departmentList == null) {
|
|
|
+ return R.ok("未获取到部门列表");
|
|
|
}
|
|
|
+
|
|
|
+ // 2. 获取access_token
|
|
|
+ QwCompany qwCompany = iQwCompanyService.selectQwCompanyByCorpId(corpId);
|
|
|
+ if (qwCompany == null) {
|
|
|
+ log.error("未找到企业信息, corpId: {}", corpId);
|
|
|
+ return R.error("未找到企业信息");
|
|
|
+ }
|
|
|
+ String accessToken = qwApiService.getToken(corpId, qwCompany.getPermanentCode());
|
|
|
+
|
|
|
+ // 3. 并发获取所有部门用户(去重)
|
|
|
+ Map<String, DeptUserResult> userMap = fetchDeptUsersConcurrently(corpId, accessToken, departmentList);
|
|
|
+ if (userMap.isEmpty()) {
|
|
|
+ return R.ok("无用户需要同步");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. 查询数据库中已存在的用户
|
|
|
+ Map<String, QwUser> existingUserMap = queryExistingUsers(corpId, userMap.keySet());
|
|
|
+
|
|
|
+ // 5. 并发转换openid并构建用户对象
|
|
|
+ List<QwUser> usersToProcess = convertUsersConcurrently(corpId, userMap, existingUserMap);
|
|
|
+
|
|
|
+ // 6. 批量数据库操作
|
|
|
+ int errorCount = (int) (userMap.size() - usersToProcess.size());
|
|
|
+ saveUsersToDatabase(usersToProcess, errorCount, startTime);
|
|
|
+
|
|
|
+ return R.ok("同步完成");
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步用户过程发生异常", e);
|
|
|
+ return R.error("同步失败:" + e.getMessage());
|
|
|
}
|
|
|
- return R.ok();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 获取部门列表
|
|
|
+ */
|
|
|
+ private List<Department> fetchDepartments(String corpId) {
|
|
|
+ QwDeptResult deptResult = qwApiService.getDepartmentList(corpId);
|
|
|
+ List<Department> departmentList = deptResult.getDepartment();
|
|
|
+ if (departmentList == null || departmentList.isEmpty()) {
|
|
|
+ log.warn("未获取到任何部门,同步结束");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ log.info("获取到部门数量: {}", departmentList.size());
|
|
|
+ return departmentList;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 并发获取所有部门用户并去重
|
|
|
+ */
|
|
|
+ private Map<String, DeptUserResult> fetchDeptUsersConcurrently(String corpId, String accessToken, List<Department> departmentList) {
|
|
|
+ Map<String, DeptUserResult> userMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ List<CompletableFuture<Void>> futures = departmentList.stream()
|
|
|
+ .map(dept -> CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ UserResult userResult = qwApiService.getUserSimpleList(corpId, accessToken, dept.getId());
|
|
|
+ List<DeptUserResult> deptUsers = userResult.getUserlist();
|
|
|
+ if (deptUsers != null && !deptUsers.isEmpty()) {
|
|
|
+ deptUsers.forEach(user -> userMap.putIfAbsent(user.getUserid(), user));
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("获取部门 [{}] {} 的用户列表失败", dept.getId(), dept.getName(), e);
|
|
|
+ }
|
|
|
+ }, SYNC_USER_EXECUTOR))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
|
+
|
|
|
+ log.info("部门用户去重后数量: {}", userMap.size());
|
|
|
+ return userMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询数据库中已存在的用户
|
|
|
+ */
|
|
|
+ private Map<String, QwUser> queryExistingUsers(String corpId, Set<String> userIds) {
|
|
|
+ List<QwUser> existingUsers = qwUserMapper.selectQwUsersByCorpIdAndUserIds(corpId, new ArrayList<>(userIds));
|
|
|
+ Map<String, QwUser> existingUserMap = existingUsers.stream()
|
|
|
+ .collect(Collectors.toMap(QwUser::getQwOpenUserId, Function.identity()));
|
|
|
+ log.info("数据库已存在用户数: {}, 新增用户数: {}", existingUserMap.size(), userIds.size() - existingUserMap.size());
|
|
|
+ return existingUserMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 并发转换openid并构建QwUser对象
|
|
|
+ */
|
|
|
+ private List<QwUser> convertUsersConcurrently(String corpId, Map<String, DeptUserResult> userMap, Map<String, QwUser> existingUserMap) {
|
|
|
+ AtomicInteger errorCount = new AtomicInteger(0);
|
|
|
+
|
|
|
+ List<CompletableFuture<QwUser>> futures = userMap.entrySet().stream()
|
|
|
+ .map(entry -> CompletableFuture.supplyAsync(() -> {
|
|
|
+ String userId = entry.getKey();
|
|
|
+ DeptUserResult apiUser = entry.getValue();
|
|
|
+ try {
|
|
|
+ // 调用API转换openid
|
|
|
+ QwOpenidByUserParams params = new QwOpenidByUserParams();
|
|
|
+ params.setUserid(userId);
|
|
|
+ QwOpenidResult openidResult = qwApiService.useridToOpenid(params, corpId);
|
|
|
+
|
|
|
+ return buildQwUser(apiUser, existingUserMap.get(userId), corpId, openidResult.getOpenid());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理用户失败,userId: {}", userId, e);
|
|
|
+ errorCount.incrementAndGet();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }, SYNC_USER_EXECUTOR))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ List<QwUser> result = futures.stream()
|
|
|
+ .map(CompletableFuture::join)
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ log.info("用户处理统计: 成功={}, 失败={}", result.size(), errorCount.get());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构建QwUser对象
|
|
|
+ */
|
|
|
+ private QwUser buildQwUser(DeptUserResult apiUser, QwUser existingUser, String corpId, String openid) {
|
|
|
+ QwUser qwUser = new QwUser();
|
|
|
+ qwUser.setQwUserName(apiUser.getName());
|
|
|
+ qwUser.setCorpId(corpId);
|
|
|
+ qwUser.setIsDel(0);
|
|
|
+ qwUser.setOpenid(openid);
|
|
|
+ qwUser.setQwOpenUserId(apiUser.getUserid());
|
|
|
+
|
|
|
+ // 设置部门(取第一个部门)
|
|
|
+ List<Integer> depts = apiUser.getDepartment();
|
|
|
+ qwUser.setDepartment(depts != null && !depts.isEmpty() ? String.valueOf(depts.get(0)) : "");
|
|
|
+
|
|
|
+ // 存在则设置id(更新),不存在则为null(新增)
|
|
|
+ if (existingUser != null) {
|
|
|
+ qwUser.setId(existingUser.getId());
|
|
|
+ }
|
|
|
+ return qwUser;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 批量保存用户到数据库
|
|
|
+ */
|
|
|
+ private void saveUsersToDatabase(List<QwUser> users, int errorCount, long startTime) {
|
|
|
+ if (users.isEmpty()) {
|
|
|
+ log.info("没有需要新增或更新的用户");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<QwUser> toInsert = users.stream().filter(u -> u.getId() == null).collect(Collectors.toList());
|
|
|
+ List<QwUser> toUpdate = users.stream().filter(u -> u.getId() != null).collect(Collectors.toList());
|
|
|
+
|
|
|
+ log.info("数据库操作: 待新增{}条, 待更新{}条", toInsert.size(), toUpdate.size());
|
|
|
+
|
|
|
+ int insertSuccess = 0, updateSuccess = 0;
|
|
|
+ if (!toInsert.isEmpty()) {
|
|
|
+ insertSuccess = qwUserMapper.batchUpdateQwUser(toInsert);
|
|
|
+ }
|
|
|
+ if (!toUpdate.isEmpty()) {
|
|
|
+ updateSuccess = qwUserMapper.batchUpdateQwUser(toUpdate);
|
|
|
+ }
|
|
|
+
|
|
|
+ long elapsed = System.currentTimeMillis() - startTime;
|
|
|
+ String resultMsg = String.format("同步完成!总耗时: %d ms | 新增: %d/%d | 更新: %d/%d | 失败: %d",
|
|
|
+ elapsed, insertSuccess, toInsert.size(), updateSuccess, toUpdate.size(), errorCount);
|
|
|
+ log.info(resultMsg);
|
|
|
+ log.info("========== 用户同步结束 ==========");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 判断是否需要更新用户信息
|
|
|
+ */
|
|
|
+ private boolean needUpdateUserInfo(QwUser existingUser) {
|
|
|
+ return existingUser.getUpdateTime() == null ||
|
|
|
+ System.currentTimeMillis() - existingUser.getUpdateTime().getTime() > 24 * 60 * 60 * 1000;
|
|
|
+ }
|
|
|
@Override
|
|
|
public R getSyncQwDept(Long tenantId, String corpId) {
|
|
|
return tenantDataSourceUtil.executeWithResult(tenantId, () -> {
|