CdrPush.java 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package com.telerobot.fs.global;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.telerobot.fs.config.SystemConfig;
  5. import com.telerobot.fs.entity.po.CdrDetail;
  6. import com.telerobot.fs.entity.po.CdrEntity;
  7. import com.telerobot.fs.mybatis.dao.SysDao;
  8. import com.telerobot.fs.utils.CommonUtils;
  9. import com.telerobot.fs.utils.DateUtils;
  10. import com.telerobot.fs.utils.OkHttpClientUtil;
  11. import com.telerobot.fs.utils.StringUtils;
  12. import lombok.SneakyThrows;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.boot.context.event.ApplicationReadyEvent;
  17. import org.springframework.context.ApplicationListener;
  18. import org.springframework.context.annotation.DependsOn;
  19. import org.springframework.stereotype.Component;
  20. import java.util.concurrent.ArrayBlockingQueue;
  21. import java.util.concurrent.Semaphore;
  22. @Component
  23. @DependsOn("appContextProvider")
  24. public class CdrPush implements ApplicationListener<ApplicationReadyEvent> {
  25. @Autowired
  26. SysDao sysDao;
  27. private static Logger logger = LoggerFactory.getLogger(CdrPush.class);
  28. private static Semaphore semaphore = new Semaphore(9999);
  29. private static ArrayBlockingQueue<CdrDetail> cdrQueue = new ArrayBlockingQueue<>(9999);
  30. private static boolean checkPostCdrEnabled(){
  31. return Boolean.parseBoolean(SystemConfig.getValue("post_cdr_enabled", "false"));
  32. }
  33. public static boolean addCdrToQueue(CdrDetail cdr){
  34. if(!checkPostCdrEnabled()){
  35. logger.info("{} cdr push is not enabled.", cdr.getUuid());
  36. return false;
  37. }
  38. if(cdrQueue.add(cdr)){
  39. semaphore.release(1);
  40. return true;
  41. }else{
  42. logger.error("{} cdr-push queue is full. Cant not process new requests. cdr json={}",
  43. cdr.getUuid(), JSON.toJSONString(cdr)
  44. );
  45. }
  46. return false;
  47. }
  48. private boolean postCdr(CdrDetail cdr){
  49. try {
  50. String url = SystemConfig.getValue("post_cdr_url");
  51. //如果有自定义回调地址 替换回调地址为自定义回调地址 否则默认回调
  52. String callBackUrl = getCallBackUrlByUuid(cdr.getUuid());
  53. logger.info("测试日志cdrUUid:{} ,callBackUrlByUuid: {}", cdr.getUuid(), callBackUrl);
  54. if(org.apache.commons.lang3.StringUtils.isNotBlank(callBackUrl)){
  55. url = callBackUrl;
  56. }
  57. logger.info("测试日志最后请求的url:{}",url);
  58. if (StringUtils.isNullOrEmpty(url)) {
  59. logger.error("post_cdr_url has not been configured yet.");
  60. return false;
  61. }
  62. String cdrData = JSON.toJSONString(cdr);
  63. String response = OkHttpClientUtil.postCdr(url, cdrData);
  64. logger.info("{} postCdr: {}, request url {} , response: {}", cdr.getUuid(), cdrData, url, response);
  65. if ("success".equalsIgnoreCase(response)) {
  66. logger.info("{} post cdr succeed.", cdr.getUuid());
  67. return true;
  68. } else {
  69. logger.error("{} post cdr failed: cdr data={}", cdr.getUuid(), cdrData);
  70. }
  71. }catch (Throwable e){
  72. logger.error("postCdr failed: {} {}", e.toString(), CommonUtils.getStackTraceString(e.getStackTrace()));
  73. }
  74. return false;
  75. }
  76. @Override
  77. public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
  78. new Thread(new Runnable() {
  79. @SneakyThrows
  80. @Override
  81. public void run() {
  82. try {
  83. logger.info("CdrPush thread is now running...");
  84. while (true) {
  85. semaphore.acquire();
  86. CdrDetail cdrDetail = cdrQueue.poll();
  87. if (null != cdrDetail) {
  88. if(!postCdr(cdrDetail)){
  89. addCdrToQueue(cdrDetail);
  90. }
  91. }
  92. Thread.sleep(10);
  93. }
  94. }catch (Throwable e){
  95. logger.error("postCdr main thread error: {} {}", e.toString(), CommonUtils.getStackTraceString(e.getStackTrace()));
  96. }
  97. }
  98. }).start();
  99. }
  100. private String getCallBackUrlByUuid(String uuid){
  101. String userBizJson = sysDao.getUserBizJson(uuid);
  102. logger.info("测试日志userBizJson:{},查询uuid:{}", userBizJson,uuid);
  103. if(org.apache.commons.lang3.StringUtils.isNotBlank(userBizJson)){
  104. JSONObject obj = JSONObject.parseObject(userBizJson);
  105. if(null != obj && !obj.isEmpty() && obj.containsKey("callbackUrl")){
  106. return obj.getString("callbackUrl");
  107. }
  108. }
  109. return null;
  110. }
  111. }