|
@@ -59,6 +59,7 @@ import java.time.ZoneId;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collector;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -571,135 +572,375 @@ public class QwSopLogsServiceImpl extends ServiceImpl<QwSopLogsMapper, QwSopLogs
|
|
|
@Override
|
|
|
public void qwSopLogsResultNew() {
|
|
|
|
|
|
+
|
|
|
logger.info("开始执行企业微信群发消息结果查询任务");
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
|
|
|
List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSendResult();
|
|
|
if (qwSopLogsList.isEmpty()) {
|
|
|
- logger.info("没有需要查询结果的群发消息记录");
|
|
|
- return;
|
|
|
+ return ;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ // 分组处理
|
|
|
Map<String, List<QwSopLogs>> grouped = qwSopLogsList.stream().collect(
|
|
|
Collectors.groupingBy(log -> log.getQwUserid() + "|" + log.getCorpId() + "|" + log.getMsgId())
|
|
|
);
|
|
|
- for (Map.Entry<String, List<QwSopLogs>> entry : grouped.entrySet()) {
|
|
|
- String key = entry.getKey();
|
|
|
- List<QwSopLogs> corpLogs = entry.getValue();
|
|
|
|
|
|
- String[] keys = key.split("\\|");
|
|
|
- String qwUserid = keys[0];
|
|
|
- String corpId = keys[1];
|
|
|
- String msgID = keys[2];
|
|
|
+ // 线程安全的数据结构
|
|
|
+ List<QwSopLogs> allUpdates = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ Queue<Map.Entry<String, List<QwSopLogs>>> taskQueue = new ConcurrentLinkedQueue<>(grouped.entrySet());
|
|
|
+ Queue<Map.Entry<String, List<QwSopLogs>>> apiFailedTasks = new ConcurrentLinkedQueue<>();
|
|
|
+ ExecutorService executor = Executors.newFixedThreadPool(10);
|
|
|
+
|
|
|
+ AtomicInteger totalGroupsProcessed = new AtomicInteger(0); // 处理的分组数
|
|
|
+ AtomicInteger totalRecordsUpdated = new AtomicInteger(0); // 更新的记录数
|
|
|
+
|
|
|
+ int batchSize = 300; // 每300个组批量更新一次
|
|
|
+ int maxRetries = 3; // 最大重试次数
|
|
|
+
|
|
|
+ // 处理所有任务(包括重试)
|
|
|
+ for (int retryCount = 0; retryCount <= maxRetries; retryCount++) {
|
|
|
+ int currentBatchCount = 0;
|
|
|
+
|
|
|
+ while (!taskQueue.isEmpty()) {
|
|
|
+ int currentBatchSize = Math.min(batchSize - currentBatchCount, taskQueue.size());
|
|
|
+ CountDownLatch batchLatch = new CountDownLatch(currentBatchSize);
|
|
|
+
|
|
|
+ // 处理当前批次任务
|
|
|
+ for (int i = 0; i < currentBatchSize; i++) {
|
|
|
+ Map.Entry<String, List<QwSopLogs>> entry = taskQueue.poll();
|
|
|
+ executor.submit(() -> {
|
|
|
+ try {
|
|
|
+ // 处理单个分组(传入apiFailedTasks)
|
|
|
+ List<QwSopLogs> updates = processSingleGroup(entry, apiFailedTasks);
|
|
|
+ if (updates != null) {
|
|
|
+ synchronized (allUpdates) {
|
|
|
+ allUpdates.addAll(updates);
|
|
|
+ }
|
|
|
+ // 统计更新记录数
|
|
|
+ totalRecordsUpdated.addAndGet(updates.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 统计处理的分组数
|
|
|
+ totalGroupsProcessed.incrementAndGet();
|
|
|
+ } finally {
|
|
|
+ batchLatch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // 等待当前批次完成
|
|
|
+ try {
|
|
|
+ batchLatch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ currentBatchCount += currentBatchSize;
|
|
|
|
|
|
- QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
|
|
|
- param.setMsgid(msgID);
|
|
|
- param.setUserid(qwUserid);
|
|
|
- param.setLimit(1000);
|
|
|
+ // 每处理1000个组或任务队列空时更新数据库
|
|
|
+ if (currentBatchCount >= batchSize || taskQueue.isEmpty()) {
|
|
|
+ synchronized (allUpdates) {
|
|
|
+ if (!allUpdates.isEmpty()) {
|
|
|
+ batchUpdateDatabase(new ArrayList<>(allUpdates));
|
|
|
+ logger.info("每处理300个组或任务队列空时更新数据库:"+new ArrayList<>(allUpdates).size());
|
|
|
+ allUpdates.clear();
|
|
|
+ }
|
|
|
|
|
|
- fetchAndProcessAllPages(param, corpId, corpLogs, msgID);
|
|
|
+ }
|
|
|
+ currentBatchCount = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ // 准备下一轮重试
|
|
|
+ if (retryCount < maxRetries) {
|
|
|
+ taskQueue.addAll(apiFailedTasks);
|
|
|
+ apiFailedTasks.clear();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- long endTime = System.currentTimeMillis();
|
|
|
- logger.info("企业微信群发消息结果查询任务完成,处理记录总数: {},总耗时: {}ms",
|
|
|
- qwSopLogsList.size(), (endTime - startTime));
|
|
|
+ // 最终更新剩余数据
|
|
|
+ synchronized (allUpdates) {
|
|
|
+ if (!allUpdates.isEmpty()) {
|
|
|
+ logger.info("最终更新剩余数据:"+new ArrayList<>(allUpdates).size());
|
|
|
+ batchUpdateDatabase(new ArrayList<>(allUpdates));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ executor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
+ executor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
|
|
|
+ long endTime = System.currentTimeMillis();
|
|
|
+ logger.info("成功处理分组数: " + totalGroupsProcessed.get());
|
|
|
+ logger.info("企业微信群发消息结果查询任务完成-更新记录总数:{},总耗时:{} " , totalRecordsUpdated.get(),(endTime - startTime));
|
|
|
}
|
|
|
|
|
|
- private void fetchAndProcessAllPages(QwGetGroupmsgSendParam param, String corpId, List<QwSopLogs> logs, String msgId) {
|
|
|
+ private List<QwSopLogs> processSingleGroup(Map.Entry<String, List<QwSopLogs>> entry, Queue<Map.Entry<String, List<QwSopLogs>>> apiFailedTasks) {
|
|
|
+
|
|
|
+ String key = entry.getKey();
|
|
|
+ List<QwSopLogs> corpLogs = entry.getValue();
|
|
|
+ String[] keys = key.split("\\|");
|
|
|
+ String qwUserid = keys[0];
|
|
|
+ String corpId = keys[1];
|
|
|
+ String msgID = keys[2];
|
|
|
+
|
|
|
+ QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
|
|
|
+ param.setMsgid(msgID);
|
|
|
+ param.setUserid(qwUserid);
|
|
|
+ param.setLimit(1000);
|
|
|
+
|
|
|
+ List<QwSopLogs> groupUpdates = new ArrayList<>();
|
|
|
String nextCursor = null;
|
|
|
+ boolean apiSuccess = true;
|
|
|
|
|
|
do {
|
|
|
param.setCursor(nextCursor);
|
|
|
+ QwGroupmsgSendResult result = fetchWithRetry(param, corpId); // 重试3次
|
|
|
+
|
|
|
+ // API调用失败处理
|
|
|
+ if (result == null || result.getErrCode() != 0) {
|
|
|
+ apiSuccess = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理当前页结果
|
|
|
+ List<QwSopLogs> pageUpdates = processPageResult(result, corpLogs, corpId, msgID);
|
|
|
+ groupUpdates.addAll(pageUpdates);
|
|
|
+ nextCursor = result.getNextCursor();
|
|
|
+
|
|
|
+ } while (nextCursor != null && !nextCursor.isEmpty());
|
|
|
+
|
|
|
+ // API调用失败时返回null,将任务加入失败队列
|
|
|
+ if (!apiSuccess) {
|
|
|
+ apiFailedTasks.offer(entry);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ return groupUpdates;
|
|
|
+ }
|
|
|
+
|
|
|
+ private QwGroupmsgSendResult fetchWithRetry(QwGetGroupmsgSendParam param, String corpId) {
|
|
|
+ int retryCount = 0;
|
|
|
+ while (retryCount <= 3) {
|
|
|
QwGroupmsgSendResult result = qwApiService.getGroupmsgSendResult(param, corpId);
|
|
|
|
|
|
+ // 请求失败情况
|
|
|
if (result == null) {
|
|
|
- logger.error("接口调用失败: {}", param);
|
|
|
- return;
|
|
|
+ retryCount++;
|
|
|
+ sleepWithJitter(2000);
|
|
|
+ continue;
|
|
|
}
|
|
|
|
|
|
- if (result.getErrCode() == 45033) {
|
|
|
- try {
|
|
|
- Thread.sleep(2000 + new Random().nextInt(1000));
|
|
|
- result = qwApiService.getGroupmsgSendResult(param, corpId);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- logger.error("线程中断", e);
|
|
|
- return;
|
|
|
- }
|
|
|
+ // 成功情况
|
|
|
+ if (result.getErrCode() == 0) {
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
- if (result.getErrCode() != 0) {
|
|
|
- logger.error("查询失败: {}, errCode: {}, errMsg: {}", param, result.getErrCode(), result.getErrMsg());
|
|
|
- return;
|
|
|
+ // 需要重试的错误码
|
|
|
+ if (result.getErrCode() == 45033 || result.getErrCode() == -1) {
|
|
|
+ retryCount++;
|
|
|
+ sleepWithJitter(2000 + new Random().nextInt(1000));
|
|
|
+ continue;
|
|
|
}
|
|
|
|
|
|
- processPageResult(result, logs, corpId, msgId);
|
|
|
- nextCursor = result.getNextCursor();
|
|
|
-
|
|
|
- } while (nextCursor != null && !nextCursor.isEmpty());
|
|
|
+ // 其他错误码不重试
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ return null; // 超过重试次数
|
|
|
}
|
|
|
|
|
|
+ private void sleepWithJitter(int baseMillis) {
|
|
|
+ try {
|
|
|
+ int sleepTime = baseMillis + (new Random().nextInt(1000));
|
|
|
+ Thread.sleep(sleepTime);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- private void processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
|
|
|
+ private List<QwSopLogs> processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
|
|
|
Map<String, SendItemResult> sendMap = result.getSendList().stream()
|
|
|
.collect(Collectors.toMap(
|
|
|
r -> r.getUserId() + "_" + r.getExternalUserId() + "_" + corpId + "_" + msgId,
|
|
|
Function.identity(),
|
|
|
- (a, b) -> a // 如果重复,保留第一个
|
|
|
+ (a, b) -> a
|
|
|
));
|
|
|
|
|
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
String now = LocalDateTime.now().format(formatter);
|
|
|
-
|
|
|
- // 只处理匹配得上的记录
|
|
|
List<QwSopLogs> matchedLogs = new ArrayList<>();
|
|
|
|
|
|
for (QwSopLogs log : logs) {
|
|
|
String logKey = log.getQwUserid() + "_" + log.getExternalUserId() + "_" + log.getCorpId() + "_" + msgId;
|
|
|
SendItemResult matched = sendMap.get(logKey);
|
|
|
-
|
|
|
- if (matched != null) {
|
|
|
-
|
|
|
- switch (matched.getStatus()) {
|
|
|
- case 0:
|
|
|
- log.setSendStatus(5L);
|
|
|
- log.setRemark("员工未在规定时间发送");
|
|
|
- break;
|
|
|
- case 1:
|
|
|
- log.setSendStatus(1L);
|
|
|
- log.setReceivingStatus(1L);
|
|
|
- break;
|
|
|
- case 2:
|
|
|
- log.setSendType(2);
|
|
|
- log.setSendStatus(3L);
|
|
|
- log.setRemark("因客户不是好友导致发送失败,补发");
|
|
|
- log.setReceivingStatus(0L);
|
|
|
- log.setSendTime(now);
|
|
|
- log.setSort(30000001);
|
|
|
- case 3:
|
|
|
- log.setSendType(2);
|
|
|
- log.setSendStatus(3L);
|
|
|
- log.setRemark("客户已经收到其他群发消息,补发");
|
|
|
- log.setReceivingStatus(0L);
|
|
|
- log.setSendTime(now);
|
|
|
- log.setSort(30000001);
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- matchedLogs.add(log);
|
|
|
+ if (matched == null) continue;
|
|
|
+
|
|
|
+ switch (matched.getStatus()) {
|
|
|
+ case 0:
|
|
|
+ log.setSendStatus(5L);
|
|
|
+ log.setRemark("员工未在规定时间发送");
|
|
|
+ break;
|
|
|
+ case 1:
|
|
|
+ log.setSendStatus(1L);
|
|
|
+ log.setReceivingStatus(1L);
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ log.setSendType(2);
|
|
|
+ log.setSendStatus(3L);
|
|
|
+ log.setRemark("因客户不是好友导致发送失败,补发");
|
|
|
+ log.setReceivingStatus(0L);
|
|
|
+ log.setSendTime(now);
|
|
|
+ log.setSort(30000001);
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
+ log.setSendType(2);
|
|
|
+ log.setSendStatus(3L);
|
|
|
+ log.setRemark("客户已经收到其他群发消息,补发");
|
|
|
+ log.setReceivingStatus(0L);
|
|
|
+ log.setSendTime(now);
|
|
|
+ log.setSort(30000001);
|
|
|
+ break;
|
|
|
}
|
|
|
+ matchedLogs.add(log);
|
|
|
}
|
|
|
-
|
|
|
- if (!matchedLogs.isEmpty()) {
|
|
|
- batchUpdateDatabase(matchedLogs);
|
|
|
- }
|
|
|
+ return matchedLogs;
|
|
|
}
|
|
|
|
|
|
+// @Override
|
|
|
+// public void qwSopLogsResultNew() {
|
|
|
+//
|
|
|
+// logger.info("开始执行企业微信群发消息结果查询任务");
|
|
|
+// long startTime = System.currentTimeMillis();
|
|
|
+//
|
|
|
+// List<QwSopLogs> qwSopLogsList = qwSopLogsMapper.selectSopLogsByCreateCorpMassSendResult();
|
|
|
+// if (qwSopLogsList.isEmpty()) {
|
|
|
+// logger.info("没有需要查询结果的群发消息记录");
|
|
|
+// return;
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// Map<String, List<QwSopLogs>> grouped = qwSopLogsList.stream().collect(
|
|
|
+// Collectors.groupingBy(log -> log.getQwUserid() + "|" + log.getCorpId() + "|" + log.getMsgId())
|
|
|
+// );
|
|
|
+// for (Map.Entry<String, List<QwSopLogs>> entry : grouped.entrySet()) {
|
|
|
+// String key = entry.getKey();
|
|
|
+// List<QwSopLogs> corpLogs = entry.getValue();
|
|
|
+//
|
|
|
+// String[] keys = key.split("\\|");
|
|
|
+// String qwUserid = keys[0];
|
|
|
+// String corpId = keys[1];
|
|
|
+// String msgID = keys[2];
|
|
|
+//
|
|
|
+// QwGetGroupmsgSendParam param = new QwGetGroupmsgSendParam();
|
|
|
+// param.setMsgid(msgID);
|
|
|
+// param.setUserid(qwUserid);
|
|
|
+// param.setLimit(1000);
|
|
|
+//
|
|
|
+// fetchAndProcessAllPages(param, corpId, corpLogs, msgID);
|
|
|
+//
|
|
|
+// }
|
|
|
+//
|
|
|
+// long endTime = System.currentTimeMillis();
|
|
|
+// logger.info("企业微信群发消息结果查询任务完成,处理记录总数: {},总耗时: {}ms",
|
|
|
+// qwSopLogsList.size(), (endTime - startTime));
|
|
|
+//
|
|
|
+// }
|
|
|
+// private void fetchAndProcessAllPages(QwGetGroupmsgSendParam param, String corpId, List<QwSopLogs> logs, String msgId) {
|
|
|
+// String nextCursor = null;
|
|
|
+//
|
|
|
+// do {
|
|
|
+// param.setCursor(nextCursor);
|
|
|
+// QwGroupmsgSendResult result = qwApiService.getGroupmsgSendResult(param, corpId);
|
|
|
+//
|
|
|
+// if (result == null) {
|
|
|
+// logger.error("接口调用失败: {}", param);
|
|
|
+// return;
|
|
|
+// }
|
|
|
+//
|
|
|
+// if (result.getErrCode() == 45033) {
|
|
|
+// try {
|
|
|
+// Thread.sleep(2000 + new Random().nextInt(1000));
|
|
|
+// result = qwApiService.getGroupmsgSendResult(param, corpId);
|
|
|
+// } catch (InterruptedException e) {
|
|
|
+// Thread.currentThread().interrupt();
|
|
|
+// logger.error("线程中断", e);
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// if (result.getErrCode() != 0) {
|
|
|
+// logger.error("查询失败: {}, errCode: {}, errMsg: {}", param, result.getErrCode(), result.getErrMsg());
|
|
|
+// return;
|
|
|
+// }
|
|
|
+//
|
|
|
+// processPageResult(result, logs, corpId, msgId);
|
|
|
+// nextCursor = result.getNextCursor();
|
|
|
+//
|
|
|
+// } while (nextCursor != null && !nextCursor.isEmpty());
|
|
|
+// }
|
|
|
+// private void processPageResult(QwGroupmsgSendResult result, List<QwSopLogs> logs, String corpId, String msgId) {
|
|
|
+// Map<String, SendItemResult> sendMap = result.getSendList().stream()
|
|
|
+// .collect(Collectors.toMap(
|
|
|
+// r -> r.getUserId() + "_" + r.getExternalUserId() + "_" + corpId + "_" + msgId,
|
|
|
+// Function.identity(),
|
|
|
+// (a, b) -> a // 如果重复,保留第一个
|
|
|
+// ));
|
|
|
+//
|
|
|
+// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+// String now = LocalDateTime.now().format(formatter);
|
|
|
+//
|
|
|
+// // 只处理匹配得上的记录
|
|
|
+// List<QwSopLogs> matchedLogs = new ArrayList<>();
|
|
|
+//
|
|
|
+// for (QwSopLogs log : logs) {
|
|
|
+// String logKey = log.getQwUserid() + "_" + log.getExternalUserId() + "_" + log.getCorpId() + "_" + msgId;
|
|
|
+// SendItemResult matched = sendMap.get(logKey);
|
|
|
+//
|
|
|
+// if (matched != null) {
|
|
|
+//
|
|
|
+// switch (matched.getStatus()) {
|
|
|
+// case 0:
|
|
|
+// log.setSendStatus(5L);
|
|
|
+// log.setRemark("员工未在规定时间发送");
|
|
|
+// break;
|
|
|
+// case 1:
|
|
|
+// log.setSendStatus(1L);
|
|
|
+// log.setReceivingStatus(1L);
|
|
|
+// break;
|
|
|
+// case 2:
|
|
|
+// log.setSendType(2);
|
|
|
+// log.setSendStatus(3L);
|
|
|
+// log.setRemark("因客户不是好友导致发送失败,补发");
|
|
|
+// log.setReceivingStatus(0L);
|
|
|
+// log.setSendTime(now);
|
|
|
+// log.setSort(30000001);
|
|
|
+// case 3:
|
|
|
+// log.setSendType(2);
|
|
|
+// log.setSendStatus(3L);
|
|
|
+// log.setRemark("客户已经收到其他群发消息,补发");
|
|
|
+// log.setReceivingStatus(0L);
|
|
|
+// log.setSendTime(now);
|
|
|
+// log.setSort(30000001);
|
|
|
+// break;
|
|
|
+// default:
|
|
|
+// break;
|
|
|
+// }
|
|
|
+//
|
|
|
+// matchedLogs.add(log);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// if (!matchedLogs.isEmpty()) {
|
|
|
+// batchUpdateDatabase(matchedLogs);
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* 批量更新数据库
|