SessionManager.java 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package com.telerobot.fs.wshandle;
  2. import com.telerobot.fs.config.AppContextProvider;
  3. import com.telerobot.fs.config.SystemConfig;
  4. import com.telerobot.fs.config.UuidGenerator;
  5. import com.telerobot.fs.entity.po.AgentEntity;
  6. import com.telerobot.fs.entity.pojo.AgentStatus;
  7. import com.telerobot.fs.mybatis.dao.SysDao;
  8. import com.telerobot.fs.service.SysService;
  9. import com.telerobot.fs.utils.CommonUtils;
  10. import com.telerobot.fs.utils.StringUtils;
  11. import com.telerobot.fs.utils.ThreadUtil;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.util.ArrayList;
  15. import java.util.Iterator;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.concurrent.ConcurrentHashMap;
  19. public class SessionManager {
  20. private static SessionManager instance;
  21. private static final Object syncRoot = new Object();
  22. private static final Logger logger = LoggerFactory.getLogger(SessionManager.class);
  23. /**
  24. * 保存所有客户端的Session会话信息的容器
  25. */
  26. private Map<String, SessionEntity> sessionContainer = new ConcurrentHashMap<String, SessionEntity>(1000);
  27. public List<String> getAllUserIpList(){
  28. List<String> allIpAddress = new ArrayList<String>(200);
  29. Iterator<Map.Entry<String, SessionEntity>> it = sessionContainer.entrySet().iterator();
  30. while (it.hasNext()) {
  31. Map.Entry<String, SessionEntity> entry = it.next();
  32. String ip = entry.getValue().getClientIp();
  33. if (!allIpAddress.contains(ip)) {
  34. allIpAddress.add(ip);
  35. }
  36. }
  37. return allIpAddress;
  38. }
  39. private SessionManager() {
  40. new Thread(new Runnable() {
  41. @Override
  42. public void run() {
  43. resetAgentBusyLockTime();
  44. }
  45. }).start();
  46. new Thread(new Runnable() {
  47. @Override
  48. public void run() {
  49. logger.info("websocket sessionManager thread has been started ...");
  50. while (true) {
  51. deleteAndGetInvalidSession();
  52. int timeout = Integer.parseInt(AppContextProvider.getEnvConfig("ws-server.ws-session-timeout").trim());
  53. ThreadUtil.sleep(timeout * 1000);
  54. }
  55. }
  56. }, "deleteInvalidSession").start();
  57. }
  58. private int lastSessionCount = 0;
  59. /**
  60. * 删除过期无效的会话
  61. *
  62. * @return
  63. */
  64. private List<String> deleteAndGetInvalidSession() {
  65. List<String> invalidOptList = new ArrayList<String>(10);
  66. Iterator<Map.Entry<String, SessionEntity>> it = sessionContainer.entrySet().iterator();
  67. int sessionCount = 0;
  68. while(it.hasNext()){
  69. Map.Entry<String, SessionEntity> entry = it.next();
  70. SessionEntity tmpObj = entry.getValue();
  71. if (!tmpObj.IsValid()) {
  72. invalidOptList.add(tmpObj.getOpNum());
  73. MessageHandlerEngineList.getInstance().delete(tmpObj.getSessionId(), false);
  74. it.remove();
  75. }
  76. sessionCount ++;
  77. }
  78. if(lastSessionCount != sessionCount) {
  79. logger.info("current session count: {}", sessionCount);
  80. lastSessionCount = sessionCount;
  81. }
  82. if (invalidOptList.size() != 0) {
  83. logger.info("Session {} has been Expired, delete from SessionContainer.", CommonUtils.ListToString(invalidOptList, true));
  84. }
  85. removeOnlineUser(invalidOptList);
  86. return invalidOptList;
  87. }
  88. /**
  89. * 根据工号删除在线用户;
  90. * @param optList
  91. * @return
  92. */
  93. private int removeOnlineUser(List<String> optList){
  94. if(optList.size() == 0){
  95. return 0;
  96. }
  97. int affectRow = 0;
  98. try {
  99. affectRow = AppContextProvider.getBean(SysDao.class).removeOnlineUser(optList);
  100. logger.info("delete expired user from database: {}", affectRow);
  101. }
  102. catch (Exception e){
  103. logger.error("error occurs while deleting expired user from database: {} , {}", e.toString(),
  104. CommonUtils.getStackTraceString(e.getStackTrace())
  105. );
  106. }
  107. return affectRow;
  108. }
  109. /**
  110. * 根据工号删除在线用户;
  111. * @param opnum 工号
  112. * @return
  113. */
  114. private int removeOnlineUser(String opnum){
  115. ArrayList<String> optList = new ArrayList<>();
  116. optList.add(opnum);
  117. int affectRow = 0;
  118. try {
  119. affectRow = AppContextProvider.getBean(SysDao.class).removeOnlineUser(optList);
  120. logger.info("Delete the number of expired session users from the database: {}", affectRow);
  121. }
  122. catch (Exception e){
  123. logger.error("database error: {}", e.toString());
  124. }
  125. return affectRow;
  126. }
  127. private int addOnlineUser(SessionEntity session){
  128. AgentEntity entity = new AgentEntity();
  129. entity.setId(UuidGenerator.GetOneUuid());
  130. entity.setClientIp(session.getClientIp());
  131. entity.setExtnum(session.getExtNum());
  132. entity.setOpnum(session.getOpNum());
  133. entity.setGroupId(session.getGroupId());
  134. entity.setLoginTime(session.getLoginTime());
  135. entity.setSessionId(session.getSessionId());
  136. entity.setSkillLevel(session.getSkillLevel());
  137. entity.setAgentStatus(AgentStatus.justLogin);
  138. entity.setSessionId(session.getSessionId());
  139. int affectRow = 0;
  140. try {
  141. affectRow = AppContextProvider.getBean(SysDao.class).addOnlineUser(entity);
  142. logger.info("addOnlineUser affectRow: {}", affectRow);
  143. }
  144. catch (Exception e){
  145. logger.error("addOnlineUser failed: {}", e.toString());
  146. }
  147. return affectRow;
  148. }
  149. /**
  150. * 通过单体模式实现,返回一个SessionManager的实例
  151. *
  152. * @return
  153. */
  154. public static SessionManager getInstance() {
  155. if (instance == null) {
  156. synchronized (syncRoot) {
  157. if (instance == null) {
  158. instance = new SessionManager();
  159. }
  160. }
  161. }
  162. return instance;
  163. }
  164. private static int transferAgentTimeOut = Integer.parseInt(
  165. SystemConfig.getValue("inbound-transfer-agent-timeout", "30")
  166. );
  167. /**
  168. * 定时批量重置 cc_online_user表 座席锁定状态 busy_lock_time;
  169. * 如果座席的锁定状态超过 transferAgentTimeOut 仍然没有解除,说明电话转接可能出现异常或者其他原因导致,
  170. * 此时如果不解除锁定状态,会导致座席永远处于忙碌状态而无法接到电话。
  171. */
  172. private void resetAgentBusyLockTime() {
  173. logger.info("start resetAgentBusyLockTime thread.");
  174. int maxLooper = 6000;
  175. int counter = 0;
  176. while (true) {
  177. long timeout = System.currentTimeMillis() - (transferAgentTimeOut + 5) * 1000;
  178. List<SessionEntity> sessionList = AppContextProvider.getBean(SysService.class).
  179. selectAgentBusyLockTimeout(timeout);
  180. if(null != sessionList && sessionList.size() > 0) {
  181. for (SessionEntity session : sessionList) {
  182. int affectRow = AppContextProvider.getBean(SysService.class).resetAgentBusyLockTimeEx(
  183. session.getOpNum(), timeout
  184. );
  185. if (affectRow > 0) {
  186. logger.info(" resetAgentBusyLockTimeEx affectRow:{}, opnum={}",
  187. affectRow, session.getOpNum());
  188. }
  189. MessageHandlerEngine engine = MessageHandlerEngineList.getInstance().
  190. getMsgHandlerEngineByOpNum(session.getOpNum());
  191. if (null != engine) {
  192. if (engine.getSessionInfo() != null) {
  193. logger.info("unLock acd agent extNum={}, opNum={}.", session.getExtNum(), session.getOpNum());
  194. engine.getSessionInfo().unLock();
  195. }
  196. }
  197. }
  198. }
  199. counter++;
  200. if(counter > maxLooper){
  201. counter = 0;
  202. logger.info("定时批量重置座席锁定状态 的线程运行中...");
  203. }
  204. ThreadUtil.sleep(1500);
  205. }
  206. }
  207. /**
  208. * 根据SessionId更新会话的活跃时间
  209. * @param sessionId
  210. * @return
  211. */
  212. public boolean updateSessionActiveTime(String sessionId) {
  213. SessionEntity sessionEntity = this.sessionContainer.get(sessionId);
  214. if(sessionEntity != null){
  215. sessionEntity.setLastActiveTime(System.currentTimeMillis());
  216. return true;
  217. }
  218. return false;
  219. }
  220. /**
  221. * 保存当前会话信息;(用户身份认证通过后才进行保存会话状态) 把当前客户端会话信息存入系统
  222. *
  223. * @param clientSession
  224. * @return 如果会话保存成功则返回空字符串,否则返回错误原因
  225. */
  226. public boolean add(SessionEntity clientSession) {
  227. if (clientSession == null) {
  228. return false;
  229. }
  230. boolean found = false;
  231. String deleteClientId = "";
  232. Iterator<Map.Entry<String, SessionEntity>> it = sessionContainer.entrySet().iterator();
  233. while(it.hasNext()){
  234. Map.Entry<String, SessionEntity> entry = it.next();
  235. SessionEntity tmpObj = entry.getValue();
  236. if (tmpObj.getOpNum().equalsIgnoreCase(clientSession.getOpNum())) {
  237. found = true;
  238. deleteClientId = tmpObj.getSessionId();
  239. it.remove();
  240. break;
  241. }
  242. }
  243. //先从数据库中删除可能存在的指定工号的登录信息;
  244. removeOnlineUser(clientSession.getOpNum());
  245. this.sessionContainer.put(clientSession.getSessionId(), clientSession);
  246. addOnlineUser(clientSession);
  247. logger.info("Current Session count is {}, successfully add a session object to SessionContainer. Details: {}",
  248. sessionContainer.size(),
  249. clientSession.toString());
  250. if (found) {
  251. MessageHandlerEngine engine = MessageHandlerEngineList.getInstance().getMsgHandlerEngine(deleteClientId);
  252. if (engine != null && !engine.getDisposeStatus()) {
  253. try {
  254. MessageResponse response = new MessageResponse();
  255. response.setMsg("user_logined_on_other_device");
  256. response.setStatus(201);
  257. engine.sendReplyToAgent(response);
  258. } catch (Exception e) {
  259. logger.info("send websocket msg error: {}", e.toString());
  260. }
  261. MessageHandlerEngineList.getInstance().delete(deleteClientId, false);
  262. }
  263. }
  264. return true;
  265. }
  266. /**
  267. * 把当前客户端会话信息从系统中删除
  268. *
  269. * @param sessionId
  270. * 客户端会话的SessionId
  271. * @return
  272. */
  273. public boolean delete(String sessionId) {
  274. if (StringUtils.isNullOrEmpty(sessionId)) {
  275. return true;
  276. }
  277. boolean found = false;
  278. SessionEntity tmpObj = sessionContainer.get(sessionId);
  279. if (tmpObj != null) {
  280. removeOnlineUser(tmpObj.getOpNum());
  281. this.sessionContainer.remove(sessionId);
  282. found = true;
  283. }
  284. return found;
  285. }
  286. }