|
@@ -16,6 +16,7 @@ import cn.hutool.json.JSONUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.fastjson.TypeReference;
|
|
import com.alibaba.fastjson.TypeReference;
|
|
|
import com.fs.common.core.domain.R;
|
|
import com.fs.common.core.domain.R;
|
|
|
|
|
+import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.utils.DateUtils;
|
|
import com.fs.common.utils.DateUtils;
|
|
|
import com.fs.common.utils.IpUtil;
|
|
import com.fs.common.utils.IpUtil;
|
|
|
import com.fs.live.domain.LiveOrder;
|
|
import com.fs.live.domain.LiveOrder;
|
|
@@ -110,7 +111,8 @@ public class FsUserServiceImpl implements IFsUserService
|
|
|
@Autowired
|
|
@Autowired
|
|
|
LiveRewardCompensationMapper liveRewardCompensationMapper;
|
|
LiveRewardCompensationMapper liveRewardCompensationMapper;
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private RedisCache redisCache;
|
|
|
|
|
|
|
|
Logger logger = LoggerFactory.getLogger(FsUserServiceImpl.class);
|
|
Logger logger = LoggerFactory.getLogger(FsUserServiceImpl.class);
|
|
|
|
|
|
|
@@ -744,6 +746,8 @@ public class FsUserServiceImpl implements IFsUserService
|
|
|
// }
|
|
// }
|
|
|
|
|
|
|
|
private static final String APP_USER_SYNC_URL = "http://42.194.245.189:8010/app/common/syncAppUsers";
|
|
private static final String APP_USER_SYNC_URL = "http://42.194.245.189:8010/app/common/syncAppUsers";
|
|
|
|
|
+ private static final String APP_USER_SYNC_BY_USER_IDS_URL = "http://42.194.245.189:8010/app/common/syncAppUsersByUserIds";
|
|
|
|
|
+ private static final String LIVE_APP_USER_SYNC_PENDING_KEY_PREFIX = "live:app_user_sync:pending:";
|
|
|
private static final int APP_USER_SYNC_BATCH_SIZE = 50;
|
|
private static final int APP_USER_SYNC_BATCH_SIZE = 50;
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -828,6 +832,153 @@ public class FsUserServiceImpl implements IFsUserService
|
|
|
fsUserMapper.batchUpdateAppSyncFields(updateList);
|
|
fsUserMapper.batchUpdateAppSyncFields(updateList);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void syncPendingAppUsersFromRedis() {
|
|
|
|
|
+ Collection<String> keys = redisCache.keys(LIVE_APP_USER_SYNC_PENDING_KEY_PREFIX + "*");
|
|
|
|
|
+ if (keys == null || keys.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<String> pendingKeys = new ArrayList<>();
|
|
|
|
|
+ List<Long> userIds = new ArrayList<>();
|
|
|
|
|
+ for (String key : keys) {
|
|
|
|
|
+ String userIdStr = redisCache.getCacheObject(key);
|
|
|
|
|
+ if (StringUtils.isBlank(userIdStr)) {
|
|
|
|
|
+ redisCache.deleteObject(key);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ userIds.add(Long.parseLong(userIdStr.trim()));
|
|
|
|
|
+ pendingKeys.add(key);
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ redisCache.deleteObject(key);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (userIds.isEmpty()) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> distinctUserIds = userIds.stream().distinct().collect(Collectors.toList());
|
|
|
|
|
+ List<FsUser> usersToSync = fsUserMapper.selectUnsyncedAppUsersByUserIds(distinctUserIds);
|
|
|
|
|
+ Set<Long> unsyncedUserIds = new HashSet<>();
|
|
|
|
|
+ if (usersToSync != null) {
|
|
|
|
|
+ for (FsUser user : usersToSync) {
|
|
|
|
|
+ if (user != null && user.getUserId() != null) {
|
|
|
|
|
+ unsyncedUserIds.add(user.getUserId());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Set<Long> completedUserIds = distinctUserIds.stream()
|
|
|
|
|
+ .filter(id -> !unsyncedUserIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+
|
|
|
|
|
+ if (usersToSync != null && !usersToSync.isEmpty()) {
|
|
|
|
|
+ Map<Long, FsUser> userByUserId = usersToSync.stream()
|
|
|
|
|
+ .filter(u -> u.getUserId() != null)
|
|
|
|
|
+ .collect(Collectors.toMap(FsUser::getUserId, Function.identity(), (a, b) -> a));
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> syncUserIds = new ArrayList<>(userByUserId.keySet());
|
|
|
|
|
+ for (int i = 0; i < syncUserIds.size(); i += APP_USER_SYNC_BATCH_SIZE) {
|
|
|
|
|
+ int end = Math.min(i + APP_USER_SYNC_BATCH_SIZE, syncUserIds.size());
|
|
|
|
|
+ List<Long> batch = syncUserIds.subList(i, end);
|
|
|
|
|
+ try {
|
|
|
|
|
+ Map<String, AppUserSyncVo> syncMap = requestAppUserSyncByUserIds(batch);
|
|
|
|
|
+ if (syncMap == null || syncMap.isEmpty()) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ List<FsUser> updateList = new ArrayList<>();
|
|
|
|
|
+ for (Long userId : batch) {
|
|
|
|
|
+ FsUser localUser = userByUserId.get(userId);
|
|
|
|
|
+ if (localUser == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ AppUserSyncVo vo = findAppUserSyncVo(syncMap, localUser);
|
|
|
|
|
+ if (vo == null || vo.getAppUserId() == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ FsUser update = new FsUser();
|
|
|
|
|
+ update.setUserId(localUser.getUserId());
|
|
|
|
|
+ update.setAppUserId(vo.getAppUserId());
|
|
|
|
|
+ update.setAppDeptName(vo.getAppDeptName());
|
|
|
|
|
+ update.setAppCompanyUserId(vo.getAppCompanyUserId());
|
|
|
|
|
+ update.setAppCompanyUserName(vo.getAppCompanyUserName());
|
|
|
|
|
+ update.setAppSyncFlag(1);
|
|
|
|
|
+ updateList.add(update);
|
|
|
|
|
+ completedUserIds.add(userId);
|
|
|
|
|
+ }
|
|
|
|
|
+ batchUpdateAppSyncUsers(updateList);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ logger.error("Redis待同步APP用户批量同步失败, batchSize={}", batch.size(), e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for (String key : pendingKeys) {
|
|
|
|
|
+ String userIdStr = redisCache.getCacheObject(key);
|
|
|
|
|
+ if (StringUtils.isBlank(userIdStr)) {
|
|
|
|
|
+ redisCache.deleteObject(key);
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long userId = Long.parseLong(userIdStr.trim());
|
|
|
|
|
+ if (completedUserIds.contains(userId)) {
|
|
|
|
|
+ redisCache.deleteObject(key);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ redisCache.deleteObject(key);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private AppUserSyncVo findAppUserSyncVo(Map<String, AppUserSyncVo> syncMap, FsUser localUser) {
|
|
|
|
|
+ if (localUser.getAppUserId() != null) {
|
|
|
|
|
+ AppUserSyncVo vo = syncMap.get(String.valueOf(localUser.getAppUserId()));
|
|
|
|
|
+ if (vo != null) {
|
|
|
|
|
+ return vo;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (StringUtils.isNotBlank(localUser.getUnionId())) {
|
|
|
|
|
+ for (AppUserSyncVo vo : syncMap.values()) {
|
|
|
|
|
+ if (vo != null && localUser.getUnionId().equals(vo.getUnionId())) {
|
|
|
|
|
+ return vo;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private Map<String, AppUserSyncVo> requestAppUserSyncByUserIds(List<Long> userIds) {
|
|
|
|
|
+ String encryptedStr = CrossServiceRsaUtil.encryptForRequest("user" + System.currentTimeMillis());
|
|
|
|
|
+ Map<String, Object> paramMap = new HashMap<>();
|
|
|
|
|
+ paramMap.put("encryptedStr", encryptedStr);
|
|
|
|
|
+ paramMap.put("userIds", userIds);
|
|
|
|
|
+
|
|
|
|
|
+ RestTemplate restTemplate = new RestTemplate();
|
|
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
|
|
+ headers.setContentType(MediaType.APPLICATION_JSON);
|
|
|
|
|
+ HttpEntity<Map<String, Object>> requestEntity = new HttpEntity<>(paramMap, headers);
|
|
|
|
|
+ String responseBody = restTemplate.postForObject(APP_USER_SYNC_BY_USER_IDS_URL, requestEntity, String.class);
|
|
|
|
|
+ if (StringUtils.isBlank(responseBody)) {
|
|
|
|
|
+ logger.warn("按userId同步APP用户接口无响应");
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+ JSONObject respObj = JSONObject.parseObject(responseBody);
|
|
|
|
|
+ if (respObj == null || !"200".equals(respObj.getString("code"))) {
|
|
|
|
|
+ logger.warn("按userId同步APP用户接口失败: {}", responseBody);
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+ String encryptedData = respObj.getString("data");
|
|
|
|
|
+ if (StringUtils.isBlank(encryptedData)) {
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+ String decryptedJson = CrossServiceMd5Util.decrypt(encryptedData);
|
|
|
|
|
+ if (StringUtils.isBlank(decryptedJson)) {
|
|
|
|
|
+ return Collections.emptyMap();
|
|
|
|
|
+ }
|
|
|
|
|
+ return JSONObject.parseObject(decryptedJson, new TypeReference<Map<String, AppUserSyncVo>>() {});
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
private Map<String, AppUserSyncVo> requestAppUserSync(List<String> maOpenIds) {
|
|
private Map<String, AppUserSyncVo> requestAppUserSync(List<String> maOpenIds) {
|
|
|
String encryptedStr = CrossServiceRsaUtil.encryptForRequest("user" + System.currentTimeMillis());
|
|
String encryptedStr = CrossServiceRsaUtil.encryptForRequest("user" + System.currentTimeMillis());
|
|
|
Map<String, Object> paramMap = new HashMap<>();
|
|
Map<String, Object> paramMap = new HashMap<>();
|