|
|
@@ -8,6 +8,7 @@ import com.fs.app.service.IpadSendServer;
|
|
|
import com.fs.common.core.redis.RedisCacheT;
|
|
|
import com.fs.common.utils.DateUtils;
|
|
|
import com.fs.common.utils.PubFun;
|
|
|
+import com.fs.course.config.CourseConfig;
|
|
|
import com.fs.qw.domain.QwIpadServer;
|
|
|
import com.fs.qw.domain.QwUser;
|
|
|
import com.fs.qw.mapper.*;
|
|
|
@@ -17,6 +18,7 @@ import com.fs.qw.vo.QwSopTempSetting;
|
|
|
import com.fs.sop.domain.QwSopLogs;
|
|
|
import com.fs.sop.mapper.QwSopLogsMapper;
|
|
|
import com.fs.sop.service.IQwSopLogsService;
|
|
|
+import com.fs.system.domain.SysConfig;
|
|
|
import com.fs.system.mapper.SysConfigMapper;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
@@ -32,7 +34,9 @@ import java.text.SimpleDateFormat;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Component
|
|
|
@@ -51,7 +55,31 @@ public class SendAppMsg {
|
|
|
@Value("${group-no}")
|
|
|
private String groupNo;
|
|
|
private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
|
|
|
- private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
|
|
|
+ private final Map<Long, TaskContext> qwMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private static final long TASK_TIMEOUT_MS = 3 * 60 * 1000L;
|
|
|
+
|
|
|
+ private static class TaskContext {
|
|
|
+ final long startTime;
|
|
|
+ final AtomicBoolean cancelled;
|
|
|
+
|
|
|
+ TaskContext() {
|
|
|
+ this.startTime = System.currentTimeMillis();
|
|
|
+ this.cancelled = new AtomicBoolean(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isTimeout() {
|
|
|
+ return System.currentTimeMillis() - startTime > TASK_TIMEOUT_MS;
|
|
|
+ }
|
|
|
+
|
|
|
+ void cancel() {
|
|
|
+ cancelled.set(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isCancelled() {
|
|
|
+ return cancelled.get();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Autowired
|
|
|
@Qualifier("customThreadPool")
|
|
|
@@ -99,24 +127,54 @@ public class SendAppMsg {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ SysConfig courseConfig = sysConfigMapper.selectConfigByConfigKey("course.config");
|
|
|
+ CourseConfig config = JSON.parseObject(courseConfig.getConfigValue(), CourseConfig.class);
|
|
|
+ int delayStart;
|
|
|
+ int delayEnd;
|
|
|
+ if (config.getDelayStart() == null || config.getDelayEnd() == null) {
|
|
|
+ log.debug("消息发送延迟为空手动设置1000ms - 2000ms");
|
|
|
+ delayStart = 1000;
|
|
|
+ delayEnd = 2000;
|
|
|
+ } else {
|
|
|
+ delayStart = config.getDelayStart();
|
|
|
+ delayEnd = config.getDelayEnd();
|
|
|
+ }
|
|
|
+
|
|
|
// 获取 pad 发送的企微
|
|
|
getQwUserList().forEach(e -> {
|
|
|
- // 如果没有值就执行后面的方法 并且入值
|
|
|
- qwMap.computeIfAbsent(e.getId(), k -> {
|
|
|
- // 线程启动
|
|
|
- CompletableFuture.runAsync(() -> {
|
|
|
- try {
|
|
|
- log.info("SendAppMsg-开始任务:{}", e.getQwUserName());
|
|
|
- // 开始任务
|
|
|
- processUser(e);
|
|
|
- } catch (Exception exception) {
|
|
|
- log.error("发送错误:", exception);
|
|
|
- } finally {
|
|
|
- log.info("SendAppMsg-删除任务:{}", e.getQwUserName());
|
|
|
- qwMap.remove(e.getId());
|
|
|
- }
|
|
|
- }, customThreadPool);
|
|
|
- return System.currentTimeMillis(); // 占位值
|
|
|
+ TaskContext ctx = qwMap.get(e.getId());
|
|
|
+ if (ctx != null) {
|
|
|
+ if (ctx.isTimeout()) {
|
|
|
+ log.warn("任务超时,标记取消:{}, 已运行: {}ms", e.getQwUserName(), System.currentTimeMillis() - ctx.startTime);
|
|
|
+ ctx.cancel();
|
|
|
+ } else {
|
|
|
+ log.debug("任务正在执行中,跳过:{}", e.getQwUserName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (customThreadPool.getActiveCount() >= customThreadPool.getMaxPoolSize()) {
|
|
|
+ log.warn("线程池已满,跳过任务:{}, 活跃线程: {}/{}", e.getQwUserName(), customThreadPool.getActiveCount(), customThreadPool.getMaxPoolSize());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ TaskContext newCtx = new TaskContext();
|
|
|
+ qwMap.put(e.getId(), newCtx);
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ log.info("开始任务:{}", e.getQwUserName());
|
|
|
+ processUser(e, delayStart, delayEnd, newCtx);
|
|
|
+ } catch (Exception exception) {
|
|
|
+ exception.printStackTrace();
|
|
|
+ log.error("发送错误:", exception);
|
|
|
+ } finally {
|
|
|
+ log.info("删除任务:{}", e.getQwUserName());
|
|
|
+ qwMap.remove(e.getId());
|
|
|
+ }
|
|
|
+ }, customThreadPool).exceptionally(ex -> {
|
|
|
+ log.error("任务提交失败:{}, 错误: {}", e.getQwUserName(), ex.getMessage());
|
|
|
+ qwMap.remove(e.getId());
|
|
|
+ return null;
|
|
|
});
|
|
|
});
|
|
|
}
|
|
|
@@ -129,7 +187,7 @@ public class SendAppMsg {
|
|
|
* @param
|
|
|
* @param
|
|
|
*/
|
|
|
- private void processUser(QwUser qwUser) {
|
|
|
+ private void processUser(QwUser qwUser, int delayStart, int delayEnd, SendAppMsg.TaskContext ctx) {
|
|
|
long start1 = System.currentTimeMillis();
|
|
|
// 获取当前企微的app待发送记录
|
|
|
List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectAllAppEByQwUserId(qwUser.getId());
|
|
|
@@ -140,6 +198,10 @@ public class SendAppMsg {
|
|
|
// 获取企微用户
|
|
|
QwUser user = qwUserMapper.selectById(qwUser.getId());
|
|
|
long end1 = System.currentTimeMillis();
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,退出:{}", qwUser.getQwUserName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
log.info("SendAppMsg-销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
|
|
|
long start3 = System.currentTimeMillis();
|
|
|
Map<String, Integer> pushCountMap = new HashMap<>();
|
|
|
@@ -147,6 +209,10 @@ public class SendAppMsg {
|
|
|
pushCountMap.put(key, Math.toIntExact(e.getPushCount()));});
|
|
|
// 循环待发送消息
|
|
|
for (QwSopLogs qwSopLogs : qwSopLogList) {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,中断发送:{}, 已发送部分消息", qwUser.getQwUserName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
long start2 = System.currentTimeMillis();
|
|
|
QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
|
|
|
// 判断消息状态是否满足发送条件
|
|
|
@@ -154,10 +220,10 @@ public class SendAppMsg {
|
|
|
continue;
|
|
|
}
|
|
|
String key = "qw:logs:pad:send:app:id:" + qwSopLogs.getId();
|
|
|
- Long time = redisCache.getCacheObject(key);
|
|
|
+// Long time = redisCache.getCacheObject(key);
|
|
|
// 判断这个消息有没有进入过发送,如果进了就不要再发了,防止重复发送,,,,, TODO 千万不能动!!!!!
|
|
|
if (redisCache.getCacheObject(key) != null) {
|
|
|
- log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId(), time);
|
|
|
+ log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId());
|
|
|
continue;
|
|
|
}
|
|
|
List<QwSopTempSetting.Content.Setting> settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class);
|
|
|
@@ -171,20 +237,24 @@ public class SendAppMsg {
|
|
|
boolean txtSendStatus = true;
|
|
|
boolean mp3SendStatus = true;
|
|
|
boolean courseSendStatus = true;
|
|
|
-// for (QwSopCourseFinishTempSetting.Setting content : allContent) {
|
|
|
-// String contentType = content.getContentType();
|
|
|
-// if (!typeList.contains(contentType)) {
|
|
|
-// continue;
|
|
|
-// }
|
|
|
-//
|
|
|
-// Integer pushCount = pushCountMap.containsKey(String.valueOf(companyId))
|
|
|
-// ? pushCountMap.get(String.valueOf(companyId))
|
|
|
-// : pushCountMap.getOrDefault(contentType, -99);
|
|
|
-//
|
|
|
-// if (pushCount == -99) {
|
|
|
-// continue; // 没有限制
|
|
|
-// }
|
|
|
-//
|
|
|
+ for (QwSopCourseFinishTempSetting.Setting content : allContent) {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ log.info("任务被取消,中断发送:{}", qwUser.getQwUserName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String contentType = content.getContentType();
|
|
|
+ if (!typeList.contains(contentType)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Integer pushCount = pushCountMap.containsKey(String.valueOf(companyId))
|
|
|
+ ? pushCountMap.get(String.valueOf(companyId))
|
|
|
+ : pushCountMap.getOrDefault(contentType, -99);
|
|
|
+
|
|
|
+ if (pushCount == -99) {
|
|
|
+ continue; // 没有限制
|
|
|
+ }
|
|
|
+
|
|
|
// int sentCount = qwRestrictionPushRecordImMapper
|
|
|
// .selectQwRestrictionPushRecordIm(qwUserId, customerId, Integer.valueOf(contentType), DateUtils.toStartTime(), DateUtils.toEndTime());
|
|
|
//
|
|
|
@@ -196,57 +266,60 @@ public class SendAppMsg {
|
|
|
// log.warn("SendAppMsg-销售{} 客户{} 类型{} 已达发送上限({})",
|
|
|
// user.getQwUserName(), customerId, contentType, pushCount);
|
|
|
// if ("9".equals(contentType)) courseSendStatus = false;
|
|
|
-// if ("15".equals(contentType)) txtSendStatus = false;
|
|
|
-// if ("16".equals(contentType)) mp3SendStatus = false;
|
|
|
+// if ("11".equals(contentType)) txtSendStatus = false;
|
|
|
+// if ("12".equals(contentType)) mp3SendStatus = false;
|
|
|
+// if ("20".equals(contentType)) liveSendStatus = false;
|
|
|
+// if ("21".equals(contentType)) luckyBagSendStatus = false;
|
|
|
// continue;
|
|
|
// }
|
|
|
-// }
|
|
|
+ }
|
|
|
|
|
|
redisCache.setCacheObject(key, System.currentTimeMillis(), 3, TimeUnit.HOURS);
|
|
|
|
|
|
// 推送 APP
|
|
|
if (!setting.getSetting().isEmpty()) {
|
|
|
try {
|
|
|
-// settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class).stream().filter(e -> "9".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
- List<QwSopCourseFinishTempSetting.Setting> linkList = allContent.stream().filter(e -> "9".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
+ List<String> sendOrder = Arrays.asList("9", "15", "16");
|
|
|
+ for (String contentType : sendOrder) {
|
|
|
+ List<QwSopCourseFinishTempSetting.Setting> currentList = allContent.stream()
|
|
|
+ .filter(e -> contentType.equals(e.getContentType()) && !Integer.valueOf(2).equals(e.getSendStatus()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if (currentList.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ switch (contentType) {
|
|
|
+ case "9":
|
|
|
+ // app课程
|
|
|
+ courseSendStatus = asyncSopTestService.asyncSendMsgBySopAppLinkNormalIM(currentList, qwSopLogs.getCorpId(), user.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
+ break;
|
|
|
+ case "15":
|
|
|
+ // app文本消息
|
|
|
+ log.info("开始发送app文本消息消息开始,消息{},用户{}", com.alibaba.fastjson.JSONObject.toJSONString(allContent), user.getQwUserName());
|
|
|
+ txtSendStatus = asyncSopTestService.asyncSendMsgBySopAppTxtNormalIM(currentList, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
+ break;
|
|
|
+ case "16":
|
|
|
+ // app语音消息
|
|
|
+ log.info("开始发送app语音消息消息开始,消息{},用户{}", com.alibaba.fastjson.JSONObject.toJSONString(allContent), user.getQwUserName());
|
|
|
+ mp3SendStatus = asyncSopTestService.asyncSendMsgBySopAppMP3NormalIM(currentList, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
|
|
|
- if (!linkList.isEmpty()) {
|
|
|
- courseSendStatus = asyncSopTestService.asyncSendMsgBySopAppLinkNormalIM(linkList, qwSopLogs.getCorpId(), user.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
- }
|
|
|
- //app文本消息
|
|
|
- log.info("开始发送app文本消息消息开始,消息{},用户{}", com.alibaba.fastjson.JSONObject.toJSONString(allContent), user.getQwUserName());
|
|
|
- List<QwSopCourseFinishTempSetting.Setting> txtList = allContent.stream().filter(e -> "15".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
+ }
|
|
|
|
|
|
- if (!txtList.isEmpty()) {
|
|
|
- txtSendStatus = asyncSopTestService.asyncSendMsgBySopAppTxtNormalIM(txtList, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
}
|
|
|
- //app语音消息
|
|
|
- log.info("开始发送app语音消息消息开始,消息{},用户{}", com.alibaba.fastjson.JSONObject.toJSONString(allContent), user.getQwUserName());
|
|
|
- List<QwSopCourseFinishTempSetting.Setting> voiceList = allContent.stream().filter(e -> "16".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
- if (!voiceList.isEmpty()) {
|
|
|
- mp3SendStatus = asyncSopTestService.asyncSendMsgBySopAppMP3NormalIM(voiceList, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId(), qwSopLogs.getId());
|
|
|
+ try {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int delay = ThreadLocalRandom.current().nextInt(300, 1000);
|
|
|
+ log.debug("pad发送消息等待:{}ms", delay);
|
|
|
+// Thread.sleep(delay);
|
|
|
+ } catch (/*InterruptedException e*/ Exception e) {
|
|
|
+ log.error("线程等待错误!");
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
}
|
|
|
- // 发送成功后记录次数
|
|
|
- // 发送成功后记录次数(只记录真正发送的 content)
|
|
|
-// for (QwSopCourseFinishTempSetting.Setting content : allContent) {
|
|
|
-// if (!typeList.contains(content.getContentType())) {
|
|
|
-// continue;
|
|
|
-// }
|
|
|
-// if (content.getSendStatus() != null && content.getSendStatus() == 2) {
|
|
|
-// // 达到上限的,不记录发送次数
|
|
|
-// continue;
|
|
|
-// }
|
|
|
-// QwRestrictionPushRecordIm record = new QwRestrictionPushRecordIm();
|
|
|
-// record.setType(Integer.valueOf(content.getContentType()));
|
|
|
-// record.setQwUserId(qwUserId);
|
|
|
-// record.setQwExternalId(customerId);
|
|
|
-// record.setCompanyId(companyId);
|
|
|
-// record.setStatus(1);
|
|
|
-// record.setCreateTime(DateUtils.getTime());
|
|
|
-// record.setTime(System.currentTimeMillis());
|
|
|
-// qwRestrictionPushRecordImMapper.insert(record);
|
|
|
-// }
|
|
|
-
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
log.error("推送APP失败", e);
|
|
|
@@ -284,14 +357,26 @@ public class SendAppMsg {
|
|
|
// long end2 = System.currentTimeMillis();
|
|
|
int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
|
|
|
// log.info("SendAppMsg-销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
|
|
|
+ try {
|
|
|
+ if (ctx.isCancelled()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int delay = ThreadLocalRandom.current().nextInt(delayStart, delayEnd);
|
|
|
+ log.debug("企微发送消息等待:{}ms", delay);
|
|
|
+// Thread.sleep(delay);
|
|
|
+ } catch (/*InterruptedException e*/ Exception e) {
|
|
|
+ log.error("线程等待错误!");
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- try {
|
|
|
- Thread.sleep(200);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- log.warn("SendAppMsg-休眠被中断:{}", qwUser.getQwUserName());
|
|
|
- }
|
|
|
+// try {
|
|
|
+// Thread.sleep(200);
|
|
|
+// } catch (InterruptedException e) {
|
|
|
+// Thread.currentThread().interrupt();
|
|
|
+// log.warn("SendAppMsg-休眠被中断:{}", qwUser.getQwUserName());
|
|
|
+// }
|
|
|
long end3 = System.currentTimeMillis();
|
|
|
log.info("SendAppMsg-销售执行完成:{}, 耗时:{}", user.getQwUserName(), end3 - start3);
|
|
|
}
|