Coze.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package com.telerobot.fs.robot.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.coze.openapi.client.auth.OAuthToken;
  4. import com.coze.openapi.client.chat.CreateChatReq;
  5. import com.coze.openapi.client.chat.model.ChatEvent;
  6. import com.coze.openapi.client.chat.model.ChatEventType;
  7. import com.coze.openapi.client.connversations.message.model.Message;
  8. import com.coze.openapi.service.auth.JWTOAuthClient;
  9. import com.coze.openapi.service.auth.TokenAuth;
  10. import com.coze.openapi.service.service.CozeAPI;
  11. import com.telerobot.fs.entity.dao.CustmInfoEntity;
  12. import com.telerobot.fs.entity.dto.LlmAiphoneRes;
  13. import com.telerobot.fs.entity.dto.llm.CozeAccount;
  14. import com.telerobot.fs.robot.AbstractChatRobot;
  15. import com.telerobot.fs.utils.CommonUtils;
  16. import io.reactivex.Flowable;
  17. import io.reactivex.schedulers.Schedulers;
  18. import link.thingscloud.freeswitch.esl.util.CurrentTimeMillisClock;
  19. import org.apache.commons.lang3.StringUtils;
  20. import javax.websocket.OnError;
  21. import java.util.Collections;
  22. import java.util.HashMap;
  23. import java.util.Map;
  24. import java.util.concurrent.atomic.AtomicBoolean;
  25. public class Coze extends AbstractChatRobot {
  26. private String conversationId = "";
  27. private static final String COZE_TOKEN_TYPE_PAT = "pat";
  28. private static final String COZE_TOKEN_TYPE_OAUTH = "oauth";
  29. private String token = "";
  30. private int expireTime = 0;
  31. private String getToken(){
  32. String cozeTokenType = ((CozeAccount)getAccount()).getTokenType();
  33. if(COZE_TOKEN_TYPE_PAT.equalsIgnoreCase(cozeTokenType)){
  34. return ((CozeAccount)getAccount()).getPatToken();
  35. }
  36. String cozeAPIBase = "https://api.coze.cn";
  37. String jwtOauthClientID = ((CozeAccount)getAccount()).getOauthClientId();
  38. String jwtOauthPrivateKey = ((CozeAccount)getAccount()).getOauthPrivateKey();
  39. String jwtOauthPublicKeyID = ((CozeAccount)getAccount()).getOauthPublicKeyId();
  40. JWTOAuthClient oauth = null;
  41. try {
  42. oauth = new JWTOAuthClient.JWTOAuthBuilder()
  43. .clientID(jwtOauthClientID)
  44. .privateKey(jwtOauthPrivateKey)
  45. .publicKey(jwtOauthPublicKeyID)
  46. .baseURL(cozeAPIBase).ttl(24 * 3600)
  47. .build();
  48. } catch (Throwable e) {
  49. logger.error("{} coze getToken error: {} {} ", uuid, e.toString(), CommonUtils.getStackTraceString(e.getStackTrace()));
  50. return "";
  51. }
  52. Long expiredMillsLeft = expireTime - (CurrentTimeMillisClock.now()/1000);
  53. //过期前1小时
  54. if (StringUtils.isEmpty(token) || expiredMillsLeft < 3600 || expireTime == 0) {
  55. synchronized (jwtOauthClientID.intern()) {
  56. if (StringUtils.isEmpty(token) || expiredMillsLeft < 3600 || expireTime == 0) {
  57. OAuthToken aAuthToken = null;
  58. try {
  59. aAuthToken = oauth.getAccessToken();
  60. expireTime = aAuthToken.getExpiresIn();
  61. token = aAuthToken.getAccessToken();
  62. logger.info("{} successfully getAccessToken={} , expireTime={} ",
  63. uuid, aAuthToken.getAccessToken().substring(0, 30) + "***", aAuthToken.getExpiresIn());
  64. } catch (Throwable e) {
  65. logger.error("{} coze getAccessToken error: {} ", uuid, CommonUtils.getStackTraceString(e.getStackTrace()));
  66. return "";
  67. }
  68. }
  69. }
  70. }
  71. return token;
  72. }
  73. private final Object waitHandle = new Object();
  74. private void release(){
  75. synchronized (waitHandle) {
  76. waitHandle.notify();
  77. }
  78. }
  79. private void acquire(){
  80. try {
  81. synchronized (waitHandle) {
  82. waitHandle.wait();
  83. }
  84. }
  85. catch (Throwable throwable){
  86. logger.error("{} acquire error: {} {} ", uuid, throwable.toString() ,
  87. CommonUtils.getStackTraceString(throwable.getStackTrace()));
  88. }
  89. }
  90. @Override
  91. public LlmAiphoneRes talkWithAiAgent(String question, Boolean... withKbResponse){
  92. LlmAiphoneRes aiphoneRes = new LlmAiphoneRes();
  93. aiphoneRes.setStatus_code(1);
  94. aiphoneRes.setClose_phone(0);
  95. aiphoneRes.setIfcan_interrupt(0);
  96. logger.info("talkWithAiAgent, Coze Server-url={}, tokenType={}, botId={} ",
  97. getAccount().serverUrl,
  98. ((CozeAccount)getAccount()).getTokenType(),
  99. ((CozeAccount)getAccount()).getBotId()
  100. );
  101. if(firstRound) {
  102. firstRound = false;
  103. JSONObject bizJson = new JSONObject();
  104. if (null != callDetail && null != callDetail.getOutboundPhoneInfo() && org.apache.commons.lang.StringUtils.isNotBlank( callDetail.getOutboundPhoneInfo().getBizJson())) {
  105. bizJson = JSONObject.parseObject(callDetail.getOutboundPhoneInfo().getBizJson());
  106. }
  107. String openingRemarks = replaceParams(llmAccountInfo.openingRemarks, bizJson);
  108. addDialogue(ROLE_ASSISTANT, openingRemarks);
  109. ttsTextCache.add(openingRemarks);
  110. sendToTts();
  111. closeTts();
  112. aiphoneRes.setBody(openingRemarks);
  113. return aiphoneRes;
  114. }else {
  115. if (!StringUtils.isEmpty(question)) {
  116. addDialogue(ROLE_USER, question);
  117. } else {
  118. addDialogue(ROLE_USER, "NO_VOICE");
  119. String noVoiceTips = llmAccountInfo.customerNoVoiceTips;
  120. addDialogue(ROLE_ASSISTANT, noVoiceTips);
  121. ttsTextCache.add(noVoiceTips);
  122. sendToTts();
  123. closeTts();
  124. aiphoneRes.setBody(noVoiceTips);
  125. return aiphoneRes;
  126. }
  127. }
  128. if(!firstRound && !StringUtils.isEmpty(question)) {
  129. try {
  130. JSONObject response = sendStreamingRequest(aiphoneRes, question, getToken());
  131. llmRoundMessages.add(response);
  132. } catch (Throwable throwable) {
  133. aiphoneRes.setStatus_code(0);
  134. logger.error("{} talkWithAiAgent error: {} {}", uuid, throwable.toString(), CommonUtils.getStackTraceString(throwable.getStackTrace()));
  135. return null;
  136. }
  137. }
  138. return aiphoneRes;
  139. }
  140. private JSONObject sendStreamingRequest(LlmAiphoneRes aiphoneRes, String question, String cozeToken){
  141. JSONObject finalResponse = new JSONObject();
  142. finalResponse.put("role", "assistant");
  143. Map<String, String> customVariables = new HashMap<>();
  144. CustmInfoEntity custmInfoEntity= callDetail.getOutboundPhoneInfo();
  145. if (null != custmInfoEntity) {
  146. if (org.apache.commons.lang.StringUtils.isNotBlank(custmInfoEntity.getBizJson())) {
  147. JSONObject bizJson = JSONObject.parseObject(custmInfoEntity.getBizJson());
  148. for (String k: bizJson.keySet()) {
  149. customVariables.put(k, bizJson.getString(k));
  150. }
  151. }
  152. }
  153. String url = getAccount().serverUrl;
  154. if(!url.endsWith("/")){
  155. url = url + "/";
  156. }
  157. TokenAuth authCli = new TokenAuth(cozeToken);
  158. // Init the Coze client through the access_token.
  159. CozeAPI coze =
  160. new CozeAPI.Builder()
  161. .baseURL(url)
  162. .auth(authCli)
  163. .readTimeout(10000)
  164. .build();
  165. /*
  166. * Step one, create chat
  167. * Call the coze.chat().stream() method to create a chat. The create method is a streaming
  168. * chat and will return a Flowable ChatEvent. Developers should iterate the iterator to get
  169. * chat event and handle them.
  170. * */
  171. String botID = ((CozeAccount)getAccount()).getBotId();
  172. CreateChatReq req =
  173. CreateChatReq.builder()
  174. .botID(botID)
  175. .userID(uuid)
  176. .customVariables(customVariables)
  177. .messages(Collections.singletonList(Message.buildUserQuestionText(question)))
  178. .build();
  179. if(!StringUtils.isEmpty(conversationId)) {
  180. req.setConversationID(conversationId);
  181. }
  182. Flowable<ChatEvent> resp = coze.chat().stream(req);
  183. StringBuilder responseBuilder = new StringBuilder();
  184. AtomicBoolean jsonFormat = new AtomicBoolean(false);
  185. AtomicBoolean recvData = new AtomicBoolean(false);
  186. long startTime = System.currentTimeMillis();
  187. resp.subscribeOn(Schedulers.io())
  188. .subscribe(
  189. event -> {
  190. if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {
  191. System.out.print(event.getMessage().getContent());
  192. if(StringUtils.isEmpty(conversationId)){
  193. conversationId = event.getMessage().getConversationId();
  194. logger.info("{} coze chat conversation_id = {}", uuid, conversationId);
  195. }
  196. if (!recvData.get()) {
  197. recvData.set(true);
  198. long costTime = (System.currentTimeMillis() - startTime);
  199. logger.info("http request cost time : {} ms.", costTime);
  200. aiphoneRes.setCostTime(costTime);
  201. }
  202. String tmpText = event.getMessage().getContent().trim();
  203. if (tmpText.startsWith("{") && !jsonFormat.get()) {
  204. logger.info("{} json response detected.", getTraceId());
  205. jsonFormat.set(true);
  206. aiphoneRes.setJsonResponse(true);
  207. }
  208. if (!StringUtils.isEmpty(tmpText) && !jsonFormat.get()) {
  209. ttsTextCache.add(tmpText);
  210. ttsTextLength += tmpText.length();
  211. // 积攒足够的字数之后,才发送给tts,避免播放异常;
  212. if (ttsTextLength >= 10 && checkPauseFlag(tmpText)) {
  213. sendToTts();
  214. }
  215. }
  216. responseBuilder.append(tmpText);
  217. }
  218. if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
  219. logger.info("{} Token usage count = {}." , uuid, event.getChat().getUsage().getTokenCount());
  220. }
  221. },
  222. throwable -> {
  223. System.err.println(": " + throwable.getMessage());
  224. logger.error("{} coze error occurred {} {}", uuid, throwable.toString(),
  225. CommonUtils.getStackTraceString(throwable.getStackTrace()));
  226. aiphoneRes.setStatus_code(0);
  227. release();
  228. },
  229. () -> {
  230. logger.info("{} coze talk done.", uuid);
  231. release();
  232. });
  233. acquire();
  234. coze.shutdownExecutor();
  235. String answer = responseBuilder.toString();
  236. logger.info("{} coze answer={}", this.uuid, answer);
  237. if(ttsTextLength > 0 && !jsonFormat.get()){
  238. sendToTts();
  239. }
  240. closeTts();
  241. finalResponse.put("content", answer);
  242. finalResponse.put("content_type", "text");
  243. aiphoneRes.setBody(answer);
  244. return finalResponse;
  245. }
  246. }