|
|
@@ -6,31 +6,41 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.fs.app.enums.CmdType;
|
|
|
import com.fs.app.websocket.bean.ResultMsgVo;
|
|
|
import com.fs.app.websocket.bean.SendMsgVo;
|
|
|
-import com.fs.company.domain.CompanyWxClient;
|
|
|
-import com.fs.company.mapper.CompanyWxClientMapper;
|
|
|
-import com.fs.company.service.CompanyWorkflowEngine;
|
|
|
-import com.fs.company.service.impl.CompanyWxServiceImpl;
|
|
|
-import com.fs.wxcid.domain.CidIpadServer;
|
|
|
-import com.fs.wxcid.mapper.CidIpadServerMapper;
|
|
|
-import com.fs.wxcid.vo.wxvo.*;
|
|
|
+import com.fs.common.config.RedisTenantContext;
|
|
|
+import com.fs.common.core.domain.model.TenantPrincipal;
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
import com.fs.common.utils.StringUtils;
|
|
|
import com.fs.common.utils.spring.SpringUtils;
|
|
|
import com.fs.company.domain.CompanyWxAccount;
|
|
|
+import com.fs.company.domain.CompanyWxClient;
|
|
|
import com.fs.company.mapper.CompanyWxAccountMapper;
|
|
|
+import com.fs.company.mapper.CompanyWxClientMapper;
|
|
|
+import com.fs.company.service.CompanyWorkflowEngine;
|
|
|
+import com.fs.company.service.impl.CompanyWxServiceImpl;
|
|
|
+import com.fs.core.config.TenantConfigContext;
|
|
|
+import com.fs.tenant.domain.TenantInfo;
|
|
|
+import com.fs.tenant.mapper.TenantInfoMapper;
|
|
|
import com.fs.wxcid.domain.WxContact;
|
|
|
+import com.fs.wxcid.mapper.CidIpadServerMapper;
|
|
|
import com.fs.wxcid.mapper.WxContactMapper;
|
|
|
import com.fs.wxcid.service.IWxMsgLogService;
|
|
|
+import com.fs.wxcid.utils.TenantHelper;
|
|
|
+import com.fs.wxcid.vo.wxvo.ContactInfoVo;
|
|
|
+import com.fs.wxcid.vo.wxvo.SyncInfoVo;
|
|
|
+import com.fs.wxcid.vo.wxvo.WxSendResultMsgVo;
|
|
|
import com.hc.openapi.tool.fastjson.JSON;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
|
|
|
+import org.springframework.security.core.context.SecurityContextHolder;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.websocket.*;
|
|
|
import javax.websocket.server.PathParam;
|
|
|
import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Method;
|
|
|
import java.time.LocalDateTime;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
@@ -51,6 +61,7 @@ public class WebSocketServer {
|
|
|
CompanyWxServiceImpl companyWxService = SpringUtils.getBean(CompanyWxServiceImpl.class);
|
|
|
CidIpadServerMapper cidIpadServerMapper = SpringUtils.getBean(CidIpadServerMapper.class);
|
|
|
CompanyWorkflowEngine companyWorkflowEngine = SpringUtils.getBean(CompanyWorkflowEngine.class);
|
|
|
+ TenantInfoMapper tenantInfoMapper = SpringUtils.getBean(TenantInfoMapper.class);
|
|
|
|
|
|
//发送消息
|
|
|
public <T> void sendMessage(Session session, ResultMsgVo<T> data) {
|
|
|
@@ -65,133 +76,164 @@ public class WebSocketServer {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
//建立连接成功调用
|
|
|
@OnOpen
|
|
|
- public void onOpen(Session session, @PathParam(value = "wxId") String wxId) {
|
|
|
- CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
- if(companyWxAccount == null){
|
|
|
- sendMessage(session, ResultMsgVo.error("未找到对应微信数据"));
|
|
|
- return;
|
|
|
+ public void onOpen(Session session, @PathParam(value = "wxId") String wxId, @PathParam(value = "tenantCode") String tenantCode) {
|
|
|
+ Boolean switchBool = switchDataBaseByTenantCode(tenantCode);
|
|
|
+ try {
|
|
|
+ CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
+ if (companyWxAccount == null) {
|
|
|
+ sendMessage(session, ResultMsgVo.error("未找到对应微信数据"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ sessionPools.put(wxId, session);
|
|
|
+ companyWxAccount.setLoginStatus(1);
|
|
|
+ companyWxAccount.setLoginTime(LocalDateTime.now());
|
|
|
+ accountMapper.updateById(companyWxAccount);
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("remark", companyWxAccount.getRemark());
|
|
|
+ sendMessage(session, ResultMsgVo.<JSONObject>builder().cmd(CmdType.INIT_REMARK).data(jsonObject).build());
|
|
|
+ log.info("{}加入webSocket!当前人数为{}", wxId, sessionPools.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("onOpenErr:{}", e.getMessage());
|
|
|
+ } finally {
|
|
|
+ if (switchBool) {
|
|
|
+ finalHandle();
|
|
|
+ }
|
|
|
}
|
|
|
- sessionPools.put(wxId, session);
|
|
|
- companyWxAccount.setLoginStatus(1);
|
|
|
- companyWxAccount.setLoginTime(LocalDateTime.now());
|
|
|
- accountMapper.updateById(companyWxAccount);
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
- jsonObject.put("remark", companyWxAccount.getRemark());
|
|
|
- sendMessage(session, ResultMsgVo.<JSONObject>builder().cmd(CmdType.INIT_REMARK).data(jsonObject).build());
|
|
|
- log.info("{}加入webSocket!当前人数为{}", wxId, sessionPools.size());
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//关闭连接时调用
|
|
|
@OnClose
|
|
|
- public void onClose(@PathParam(value = "wxId") String wxId) {
|
|
|
- sessionPools.remove(wxId);
|
|
|
- CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
- if(companyWxAccount != null){
|
|
|
- companyWxAccount.setLoginStatus(0);
|
|
|
- companyWxAccount.setOutTime(LocalDateTime.now());
|
|
|
- companyWxAccount.setOutRemark("连接断开");
|
|
|
- accountMapper.updateById(companyWxAccount);
|
|
|
+ public void onClose(@PathParam(value = "wxId") String wxId, @PathParam(value = "tenantCode") String tenantCode) {
|
|
|
+ Boolean switchBool = switchDataBaseByTenantCode(tenantCode);
|
|
|
+ try {
|
|
|
+ sessionPools.remove(wxId);
|
|
|
+ CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
+ if (companyWxAccount != null) {
|
|
|
+ companyWxAccount.setLoginStatus(0);
|
|
|
+ companyWxAccount.setOutTime(LocalDateTime.now());
|
|
|
+ companyWxAccount.setOutRemark("连接断开");
|
|
|
+ accountMapper.updateById(companyWxAccount);
|
|
|
+ }
|
|
|
+ log.info("{}断开webSocket连接!当前人数为{}", wxId, sessionPools.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("onCloseErr:{}", e.getMessage());
|
|
|
+ } finally {
|
|
|
+ if (switchBool) {
|
|
|
+ finalHandle();
|
|
|
+ }
|
|
|
}
|
|
|
- log.info("{}断开webSocket连接!当前人数为{}", wxId, sessionPools.size());
|
|
|
}
|
|
|
|
|
|
//收到客户端信息
|
|
|
@OnMessage
|
|
|
- public void onMessage(String message, @PathParam(value = "wxId") String wxId) {
|
|
|
- SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
- if(msg.getType() == 0){
|
|
|
- return;
|
|
|
- }
|
|
|
- Session session = sessionPools.get(wxId);
|
|
|
- if(session == null){
|
|
|
- log.error("参数异常:{}", wxId);
|
|
|
- return;
|
|
|
- }
|
|
|
- log.info("收到数据:{}", msg.getCmd());
|
|
|
- CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
- if(companyWxAccount == null){
|
|
|
- log.error("未找到对应账号:{}", wxId);
|
|
|
- return;
|
|
|
- }
|
|
|
+ public void onMessage(String message, @PathParam(value = "wxId") String wxId, @PathParam(value = "tenantCode") String tenantCode) {
|
|
|
+ Boolean switchBool = switchDataBaseByTenantCode(tenantCode);
|
|
|
try {
|
|
|
- switch (msg.getCmd()) {
|
|
|
- case HEARTBEAT:
|
|
|
- log.info("接收心跳:{}", wxId);
|
|
|
- break;
|
|
|
- case SYNC_CONTACT_PERSON:
|
|
|
- ContactInfoVo contactInfoVo = JSON.parseObject(msg.getDataJson(), ContactInfoVo.class);
|
|
|
- if(contactInfoVo == null || StringUtils.isEmpty(contactInfoVo.getRemark())){
|
|
|
- log.error("{}同步数据失败,数据缺失:{}", wxId, contactInfoVo);
|
|
|
- return;
|
|
|
- }
|
|
|
- WxContact contact = wxContactMapper.selectOne(new QueryWrapper<WxContact>().eq("remark", contactInfoVo.getRemark()));
|
|
|
- if(contact != null){
|
|
|
- contact.setNickName(contactInfoVo.getNickName());
|
|
|
- contact.setCity(contactInfoVo.getAddress());
|
|
|
- contact.setUserName(contactInfoVo.getWxNo());
|
|
|
- contact.setUpdateTime(new Date());
|
|
|
- wxContactMapper.updateById(contact);
|
|
|
- }else{
|
|
|
- WxContact contact1 = new WxContact();
|
|
|
- contact1.setUserName(contactInfoVo.getWxNo());
|
|
|
- contact1.setNickName(contactInfoVo.getNickName());
|
|
|
- contact1.setCity(contactInfoVo.getAddress());
|
|
|
- contact1.setAccountId(companyWxAccount.getId());
|
|
|
- contact1.setCompanyId(companyWxAccount.getCompanyId());
|
|
|
- contact1.setCompanyUserId(companyWxAccount.getCompanyUserId());
|
|
|
- contact1.setRemark(contactInfoVo.getRemark());
|
|
|
- contact1.setCreateTime(new Date());
|
|
|
- contact1.setUpdateTime(new Date());
|
|
|
- wxContactMapper.insert(contact1);
|
|
|
- }
|
|
|
- break;
|
|
|
- case SEND_MSG:
|
|
|
- log.info("发送返回:{}", msg);
|
|
|
- wxMsgLogService.insertLog(JSON.parseObject(msg.getDataJson(), WxSendResultMsgVo.class), companyWxAccount, 0);
|
|
|
- break;
|
|
|
- case SEND_RESULT:
|
|
|
- log.info("接收消息:{}", msg);
|
|
|
- wxMsgLogService.insertLog(JSON.parseObject(msg.getDataJson(), WxSendResultMsgVo.class), companyWxAccount, 0);
|
|
|
- break;
|
|
|
- case SYNC_INFO:
|
|
|
- SyncInfoVo syncInfoVo = JSON.parseObject(msg.getDataJson(), SyncInfoVo.class);
|
|
|
- companyWxAccount.setHeadImgUrl(syncInfoVo.getImg());
|
|
|
- companyWxAccount.setPhone(syncInfoVo.getPhone());
|
|
|
- accountMapper.updateById(companyWxAccount);
|
|
|
- break;
|
|
|
- case ADD_WX_RESULT:
|
|
|
- com.fs.wxcid.vo.wxvo.AddResultWxVo addResultWxVo = JSON.parseObject(msg.getDataJson(), com.fs.wxcid.vo.wxvo.AddResultWxVo.class);
|
|
|
- log.info("接收到加好友回调:{}", addResultWxVo);
|
|
|
- WxContact wxContact = wxContactMapper.selectOne(new QueryWrapper<WxContact>().eq("remark", addResultWxVo.getRemark()).eq("friends", 0));
|
|
|
- log.info("更新联系人:{}", wxContact);
|
|
|
- wxContact.setFriends(1);
|
|
|
- wxContact.setAlias(addResultWxVo.getWxid());
|
|
|
- wxContactMapper.updateById(wxContact);
|
|
|
- List<CompanyWxClient> clients = companyWxClientMapper.selectWxV2(companyWxAccount.getId(), wxContact.getPhone());
|
|
|
- log.info("更新联系人2:{}", clients);
|
|
|
- if(clients != null){
|
|
|
- clients.parallelStream().forEach(e -> {
|
|
|
- e.setIsAdd(1);
|
|
|
- e.setRemark(addResultWxVo.getRemark());
|
|
|
- e.setWxName(addResultWxVo.getUserName());
|
|
|
- e.setSuccessAddTime(LocalDateTime.now());
|
|
|
- companyWxClientMapper.updateById(e);
|
|
|
- companyWxService.triggerWorkflowOnAddWxSuccess(e.getId());
|
|
|
- });
|
|
|
- }
|
|
|
+ SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
+ if (msg.getType() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Session session = sessionPools.get(wxId);
|
|
|
+ if (session == null) {
|
|
|
+ log.error("参数异常:{}", wxId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("收到数据:{}", msg.getCmd());
|
|
|
+ CompanyWxAccount companyWxAccount = accountMapper.selectOne(new QueryWrapper<CompanyWxAccount>().eq("wx_no", wxId));
|
|
|
+ if (companyWxAccount == null) {
|
|
|
+ log.error("未找到对应账号:{}", wxId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ switch (msg.getCmd()) {
|
|
|
+ case HEARTBEAT:
|
|
|
+ log.info("接收心跳:{}", wxId);
|
|
|
+ break;
|
|
|
+ case SYNC_CONTACT_PERSON:
|
|
|
+ ContactInfoVo contactInfoVo = JSON.parseObject(msg.getDataJson(), ContactInfoVo.class);
|
|
|
+ if (contactInfoVo == null || StringUtils.isEmpty(contactInfoVo.getRemark())) {
|
|
|
+ log.error("{}同步数据失败,数据缺失:{}", wxId, contactInfoVo);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ WxContact contact = wxContactMapper.selectOne(new QueryWrapper<WxContact>().eq("remark", contactInfoVo.getRemark()));
|
|
|
+ if (contact != null) {
|
|
|
+ contact.setNickName(contactInfoVo.getNickName());
|
|
|
+ contact.setCity(contactInfoVo.getAddress());
|
|
|
+ contact.setUserName(contactInfoVo.getWxNo());
|
|
|
+ contact.setUpdateTime(new Date());
|
|
|
+ wxContactMapper.updateById(contact);
|
|
|
+ } else {
|
|
|
+ WxContact contact1 = new WxContact();
|
|
|
+ contact1.setUserName(contactInfoVo.getWxNo());
|
|
|
+ contact1.setNickName(contactInfoVo.getNickName());
|
|
|
+ contact1.setCity(contactInfoVo.getAddress());
|
|
|
+ contact1.setAccountId(companyWxAccount.getId());
|
|
|
+ contact1.setCompanyId(companyWxAccount.getCompanyId());
|
|
|
+ contact1.setCompanyUserId(companyWxAccount.getCompanyUserId());
|
|
|
+ contact1.setRemark(contactInfoVo.getRemark());
|
|
|
+ contact1.setCreateTime(new Date());
|
|
|
+ contact1.setUpdateTime(new Date());
|
|
|
+ wxContactMapper.insert(contact1);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case SEND_MSG:
|
|
|
+ log.info("发送返回:{}", msg);
|
|
|
+ wxMsgLogService.insertLog(JSON.parseObject(msg.getDataJson(), WxSendResultMsgVo.class), companyWxAccount, 0);
|
|
|
+ break;
|
|
|
+ case SEND_RESULT:
|
|
|
+ log.info("接收消息:{}", msg);
|
|
|
+ wxMsgLogService.insertLog(JSON.parseObject(msg.getDataJson(), WxSendResultMsgVo.class), companyWxAccount, 0);
|
|
|
+ break;
|
|
|
+ case SYNC_INFO:
|
|
|
+ SyncInfoVo syncInfoVo = JSON.parseObject(msg.getDataJson(), SyncInfoVo.class);
|
|
|
+ companyWxAccount.setHeadImgUrl(syncInfoVo.getImg());
|
|
|
+ companyWxAccount.setPhone(syncInfoVo.getPhone());
|
|
|
+ accountMapper.updateById(companyWxAccount);
|
|
|
+ break;
|
|
|
+ case ADD_WX_RESULT:
|
|
|
+ com.fs.wxcid.vo.wxvo.AddResultWxVo addResultWxVo = JSON.parseObject(msg.getDataJson(), com.fs.wxcid.vo.wxvo.AddResultWxVo.class);
|
|
|
+ log.info("接收到加好友回调:{}", addResultWxVo);
|
|
|
+ WxContact wxContact = wxContactMapper.selectOne(new QueryWrapper<WxContact>().eq("remark", addResultWxVo.getRemark()).eq("friends", 0));
|
|
|
+ log.info("更新联系人:{}", wxContact);
|
|
|
+ wxContact.setFriends(1);
|
|
|
+ wxContact.setAlias(addResultWxVo.getWxid());
|
|
|
+ wxContactMapper.updateById(wxContact);
|
|
|
+ List<CompanyWxClient> clients = companyWxClientMapper.selectWxV2(companyWxAccount.getId(), wxContact.getPhone());
|
|
|
+ log.info("更新联系人2:{}", clients);
|
|
|
+ if (clients != null) {
|
|
|
+ clients.parallelStream().forEach(e -> {
|
|
|
+ e.setIsAdd(1);
|
|
|
+ e.setRemark(addResultWxVo.getRemark());
|
|
|
+ e.setWxName(addResultWxVo.getUserName());
|
|
|
+ e.setSuccessAddTime(LocalDateTime.now());
|
|
|
+ companyWxClientMapper.updateById(e);
|
|
|
+ companyWxService.triggerWorkflowOnAddWxSuccess(e.getId());
|
|
|
+ });
|
|
|
+ }
|
|
|
// if(null != addResultWxVo && StringUtils.isNotBlank(addResultWxVo.getBizJson())){
|
|
|
// JSONObject jsonObject = JSONObject.parseObject(addResultWxVo.getBizJson());
|
|
|
// jsonObject.put("remark",addResultWxVo.getRemark());
|
|
|
// companyWorkflowEngine.addWxSuccess(jsonObject);
|
|
|
// }
|
|
|
- break;
|
|
|
+ break;
|
|
|
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("发生错误;{}", e.getMessage());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- log.error("发生错误;{}", e.getMessage());
|
|
|
+ log.error("onMessageErr:{}", e.getMessage());
|
|
|
}
|
|
|
+ finally {
|
|
|
+ if (switchBool) {
|
|
|
+ finalHandle();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
}
|
|
|
|
|
|
@@ -201,4 +243,52 @@ public class WebSocketServer {
|
|
|
log.error("发生错误;{}", throwable.getMessage());
|
|
|
throwable.printStackTrace();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据租户编码切换数据源
|
|
|
+ *
|
|
|
+ * @param tenantCode
|
|
|
+ */
|
|
|
+ public Boolean switchDataBaseByTenantCode(String tenantCode) {
|
|
|
+ if (StringUtils.isBlank(tenantCode)) {
|
|
|
+ log.error("未找到对应租户:{}", tenantCode);
|
|
|
+ return Boolean.FALSE;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ TenantInfo tenantInfo = tenantInfoMapper.getTenByCode(tenantCode);
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("ensureSwitchByTenantId", Long.class);
|
|
|
+ method.invoke(manager, tenantInfo.getId());
|
|
|
+ // 设置租户到 SecurityContext,供 TenantKeyRedisSerializer 自动为 Redis Key 加 tenantid 前缀
|
|
|
+ SecurityContextHolder.getContext().setAuthentication(
|
|
|
+ new UsernamePasswordAuthenticationToken(
|
|
|
+ new TenantPrincipal(TenantHelper.getTenantId()),
|
|
|
+ null,
|
|
|
+ Collections.emptyList()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ // 切换 Redis 租户上下文
|
|
|
+ RedisTenantContext.setTenantId(TenantHelper.getTenantId());
|
|
|
+ return Boolean.TRUE;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("callerResult4EasyCall 切换租户数据源失败: tenantId={}", TenantHelper.getTenantId(), e);
|
|
|
+ return Boolean.FALSE;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void finalHandle() {
|
|
|
+ try {
|
|
|
+ TenantConfigContext.clear();
|
|
|
+ SecurityContextHolder.clearContext();
|
|
|
+ Object manager = SpringUtils.getBean("tenantDataSourceManager");
|
|
|
+ Method method = manager.getClass().getMethod("clear");
|
|
|
+ method.invoke(manager);
|
|
|
+ TenantHelper.clearTenantId();
|
|
|
+ RedisTenantContext.clear();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("SOP异步任务清理租户数据源失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|