|
|
@@ -0,0 +1,206 @@
|
|
|
+package com.fs.app.task;
|
|
|
+
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.fs.app.service.IpadSendServer;
|
|
|
+import com.fs.common.core.redis.RedisCacheT;
|
|
|
+import com.fs.common.utils.PubFun;
|
|
|
+import com.fs.qw.domain.QwIpadServer;
|
|
|
+import com.fs.qw.domain.QwUser;
|
|
|
+import com.fs.qw.mapper.QwIpadServerMapper;
|
|
|
+import com.fs.qw.mapper.QwPushCountMapper;
|
|
|
+import com.fs.qw.mapper.QwRestrictionPushRecordMapper;
|
|
|
+import com.fs.qw.mapper.QwUserMapper;
|
|
|
+import com.fs.qw.service.impl.AsyncSopTestService;
|
|
|
+import com.fs.qw.vo.QwSopCourseFinishTempSetting;
|
|
|
+import com.fs.qw.vo.QwSopTempSetting;
|
|
|
+import com.fs.sop.domain.QwSopLogs;
|
|
|
+import com.fs.sop.mapper.QwSopLogsMapper;
|
|
|
+import com.fs.sop.service.IQwSopLogsService;
|
|
|
+import com.fs.system.mapper.SysConfigMapper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class SendAppMsg {
|
|
|
+
|
|
|
+ private final QwUserMapper qwUserMapper;
|
|
|
+ private final QwSopLogsMapper qwSopLogsMapper;
|
|
|
+ private final IpadSendServer sendServer;
|
|
|
+ private final SysConfigMapper sysConfigMapper;
|
|
|
+ private final IQwSopLogsService qwSopLogsService;
|
|
|
+ private final AsyncSopTestService asyncSopTestService;
|
|
|
+ private final QwIpadServerMapper qwIpadServerMapper;
|
|
|
+ private final RedisCacheT<Long> redisCache;
|
|
|
+ private final QwPushCountMapper qwPushCountMapper;
|
|
|
+ private final QwRestrictionPushRecordMapper qwRestrictionPushRecordMapper;
|
|
|
+ @Value("${group-no}")
|
|
|
+ private String groupNo;
|
|
|
+ private final List<QwUser> qwUserList = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ private final Map<Long, Long> qwMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Qualifier("customThreadPool")
|
|
|
+ private ThreadPoolTaskExecutor customThreadPool;
|
|
|
+
|
|
|
+
|
|
|
+ public SendAppMsg(QwUserMapper qwUserMapper, QwSopLogsMapper qwSopLogsMapper, IpadSendServer sendServer, SysConfigMapper sysConfigMapper, IQwSopLogsService qwSopLogsService, AsyncSopTestService asyncSopTestService, QwIpadServerMapper qwIpadServerMapper, RedisCacheT<Long> redisCache, QwPushCountMapper qwPushCountMapper, QwRestrictionPushRecordMapper qwRestrictionPushRecordMapper) {
|
|
|
+ this.qwUserMapper = qwUserMapper;
|
|
|
+ this.qwSopLogsMapper = qwSopLogsMapper;
|
|
|
+ this.sendServer = sendServer;
|
|
|
+ this.sysConfigMapper = sysConfigMapper;
|
|
|
+ this.qwSopLogsService = qwSopLogsService;
|
|
|
+ this.asyncSopTestService = asyncSopTestService;
|
|
|
+ this.qwIpadServerMapper = qwIpadServerMapper;
|
|
|
+ this.redisCache = redisCache;
|
|
|
+ this.qwPushCountMapper = qwPushCountMapper;
|
|
|
+ this.qwRestrictionPushRecordMapper = qwRestrictionPushRecordMapper;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<QwUser> getQwUserList() {
|
|
|
+ if (qwUserList.isEmpty()) {
|
|
|
+ List<QwIpadServer> serverList = qwIpadServerMapper.selectList(new QueryWrapper<QwIpadServer>().eq("group_no", groupNo));
|
|
|
+ if (serverList.isEmpty()) {
|
|
|
+ return new ArrayList<>();
|
|
|
+ }
|
|
|
+ List<Long> serverIds = PubFun.listToNewList(serverList, QwIpadServer::getId);
|
|
|
+ List<QwUser> qwUsers = qwUserMapper.selectList(new QueryWrapper<QwUser>().eq("send_msg_type", 1).eq("server_status", 1).eq("ipad_status", 0).in("server_id", serverIds));
|
|
|
+ qwUserList.addAll(qwUsers);
|
|
|
+ }
|
|
|
+ return qwUserList;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Scheduled(fixedRate = 50000) // 每50秒执行一次
|
|
|
+ public void refulsQwUserList() {
|
|
|
+ qwUserList.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedDelay = 20000) // 每20秒执行一次
|
|
|
+ public void sendMsg2() {
|
|
|
+ if (StringUtils.isEmpty(groupNo)) {
|
|
|
+ log.error("corpId为空不执行");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取 pad 发送的企微
|
|
|
+ getQwUserList().forEach(e -> {
|
|
|
+ // 如果没有值就执行后面的方法 并且入值
|
|
|
+ qwMap.computeIfAbsent(e.getId(), k -> {
|
|
|
+ // 线程启动
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ log.info("SendAppMsg-开始任务:{}", e.getQwUserName());
|
|
|
+ // 开始任务
|
|
|
+ processUser(e);
|
|
|
+ } catch (Exception exception) {
|
|
|
+ log.error("发送错误:", exception);
|
|
|
+ } finally {
|
|
|
+ log.info("SendAppMsg-删除任务:{}", e.getQwUserName());
|
|
|
+ qwMap.remove(e.getId());
|
|
|
+ }
|
|
|
+ }, customThreadPool);
|
|
|
+ return System.currentTimeMillis(); // 占位值
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送任务执行
|
|
|
+ *
|
|
|
+ * @param qwUser 发送企微
|
|
|
+ * @param
|
|
|
+ * @param
|
|
|
+ * @param
|
|
|
+ */
|
|
|
+ private void processUser(QwUser qwUser) {
|
|
|
+ long start1 = System.currentTimeMillis();
|
|
|
+ // 获取当前企微待发送记录
|
|
|
+ List<QwSopLogs> qwSopLogList = qwSopLogsMapper.selectByQwUserId(qwUser.getId());
|
|
|
+ if (qwSopLogList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 获取企微用户
|
|
|
+ QwUser user = qwUserMapper.selectById(qwUser.getId());
|
|
|
+ long end1 = System.currentTimeMillis();
|
|
|
+ log.info("SendAppMsg-销售:{}, 消息:{}, 耗时: {}, 时间:{}", user.getQwUserName(), qwSopLogList.size(), end1 - start1, qwMap.get(qwUser.getId()));
|
|
|
+ long start3 = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 循环代发送消息
|
|
|
+ for (QwSopLogs qwSopLogs : qwSopLogList) {
|
|
|
+ long start2 = System.currentTimeMillis();
|
|
|
+ QwSopCourseFinishTempSetting setting = JSON.parseObject(qwSopLogs.getContentJson(), QwSopCourseFinishTempSetting.class);
|
|
|
+ // 判断消息状态是否满足发送条件
|
|
|
+ if (!sendServer.isSendLogs(qwSopLogs, setting, user)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String key = "qw:logs:pad:send:id:" + qwSopLogs.getId();
|
|
|
+ Long time = redisCache.getCacheObject(key);
|
|
|
+ // 判断这个消息有没有进入过发送,如果进了就不要再发了,防止重复发送,,,,, TODO 千万不能动!!!!!
|
|
|
+ if (redisCache.getCacheObject(key) != null) {
|
|
|
+ log.error("{}已有发送:{}, :{}", qwUser.getQwUserName(), qwSopLogs.getId(), time);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ List<QwSopTempSetting.Content.Setting> settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class);
|
|
|
+ List<String> typeList = Arrays.asList("9", "11", "12");
|
|
|
+ if (setting.getType() != 4 && !CollectionUtils.isEmpty(settings) && settings.stream().anyMatch(e -> typeList.contains(e.getContentType()))) {
|
|
|
+ redisCache.setCacheObject(key, System.currentTimeMillis(), 3, TimeUnit.HOURS);
|
|
|
+ // 推送 APP
|
|
|
+ if (!setting.getSetting().isEmpty()) {
|
|
|
+ try {
|
|
|
+ settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class).stream().filter(e -> "9".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
+ if (!settings.isEmpty()) {
|
|
|
+ asyncSopTestService.asyncSendMsgBySopAppLinkNormalIM(settings, qwSopLogs.getCorpId(), user.getCompanyUserId(), qwSopLogs.getFsUserId());
|
|
|
+ }
|
|
|
+ //app文本消息
|
|
|
+ log.info("开始发送app文本消息消息开始,消息{},用户{}", JSONObject.toJSONString(settings), user.getQwUserName());
|
|
|
+ settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class).stream().filter(e -> "11".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (!settings.isEmpty()) {
|
|
|
+ asyncSopTestService.asyncSendMsgBySopAppTxtNormalIM(settings, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId());
|
|
|
+ }
|
|
|
+ //app语音消息
|
|
|
+ log.info("开始发送app语音消息消息开始,消息{},用户{}", JSONObject.toJSONString(settings), user.getQwUserName());
|
|
|
+ settings = JSON.parseArray(JSON.toJSONString(setting.getSetting()), QwSopTempSetting.Content.Setting.class).stream().filter(e -> "12".equals(e.getContentType())).collect(Collectors.toList());
|
|
|
+ if (!settings.isEmpty()) {
|
|
|
+ asyncSopTestService.asyncSendMsgBySopAppMP3NormalIM(settings, qwSopLogs.getCorpId(), qwUser.getCompanyUserId(), qwSopLogs.getFsUserId());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送APP失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qwSopLogs.setSend(true);
|
|
|
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ QwSopLogs updateQwSop = new QwSopLogs();
|
|
|
+ updateQwSop.setId(qwSopLogs.getId());
|
|
|
+ updateQwSop.setReceivingStatus(1L);
|
|
|
+ updateQwSop.setSendStatus(1L);
|
|
|
+ updateQwSop.setRemark("APP发送成功");
|
|
|
+ updateQwSop.setRealSendTime(sdf.format(new Date()));
|
|
|
+ updateQwSop.setContentJson(JSON.toJSONString(setting));
|
|
|
+ long end2 = System.currentTimeMillis();
|
|
|
+ int i = qwSopLogsService.updateQwSopLogsSendType(updateQwSop);
|
|
|
+ log.info("SendAppMsg-销售:{}, 修改条数{}, 发送方消息完成:{}, 耗时: {}", user.getQwUserName(), i, qwSopLogs.getId(), end2 - start2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ long end3 = System.currentTimeMillis();
|
|
|
+ log.info("SendAppMsg-销售执行完成:{}, 耗时:{}", user.getQwUserName(), end3 - start3);
|
|
|
+ }
|
|
|
+}
|