|
|
@@ -0,0 +1,471 @@
|
|
|
+package com.fs.live.websocket.service;
|
|
|
+
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.fs.live.config.ProductionWordFilter;
|
|
|
+import com.fs.live.websocket.auth.WebSocketConfigurator;
|
|
|
+import com.fs.live.websocket.bean.SendMsgVo;
|
|
|
+import com.fs.common.core.domain.R;
|
|
|
+import com.fs.common.core.redis.RedisCache;
|
|
|
+import com.fs.common.exception.BaseException;
|
|
|
+import com.fs.common.utils.StringUtils;
|
|
|
+import com.fs.common.utils.spring.SpringUtils;
|
|
|
+import com.fs.live.domain.*;
|
|
|
+import com.fs.live.service.*;
|
|
|
+import com.fs.live.vo.LiveGoodsVo;
|
|
|
+import com.fs.store.domain.FsUser;
|
|
|
+import com.fs.store.service.IFsUserService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.time.DateUtils;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.websocket.*;
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import static com.fs.common.constant.LiveKeysConstant.*;
|
|
|
+
|
|
|
+@ServerEndpoint(value = "/ws/app/webSocket",configurator = WebSocketConfigurator.class)
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class WebSocketServer {
|
|
|
+
|
|
|
+ // 直播间用户session
|
|
|
+ private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
|
|
|
+ // 管理端连接
|
|
|
+ private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
|
|
|
+ private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
|
|
|
+ private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
|
|
|
+ private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
|
|
|
+ private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
|
|
|
+ private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
|
|
|
+ private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
|
|
|
+ private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
|
|
|
+ private final ILiveRedConfService liveRedConfService = SpringUtils.getBean(ILiveRedConfService.class);
|
|
|
+ private final ILiveLotteryConfService liveLotteryConfService = SpringUtils.getBean(ILiveLotteryConfService.class);
|
|
|
+ private final ILiveGoodsService liveGoodsService = SpringUtils.getBean(ILiveGoodsService.class);
|
|
|
+ private final ILiveUserFirstEntryService liveUserFirstEntryService = SpringUtils.getBean(ILiveUserFirstEntryService.class);
|
|
|
+ // 直播间在线用户缓存
|
|
|
+// private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+
|
|
|
+ //建立连接成功调用
|
|
|
+ @OnOpen
|
|
|
+ public void onOpen(Session session) {
|
|
|
+
|
|
|
+ Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+ long liveId = (long) userProperties.get("liveId");
|
|
|
+ long userId = (long) userProperties.get("userId");
|
|
|
+ long userType = (long) userProperties.get("userType");
|
|
|
+ Live live = liveService.selectLiveByLiveId(liveId);
|
|
|
+ if (live == null) {
|
|
|
+ throw new BaseException("未找到直播间");
|
|
|
+ }
|
|
|
+ long companyId = live.getCompanyId() == null ? -1L : live.getCompanyId();
|
|
|
+ long companyUserId = -1L;
|
|
|
+ if (!Objects.isNull(userProperties.get("companyId"))) {
|
|
|
+ companyId = (long) userProperties.get("companyId");
|
|
|
+ }
|
|
|
+ if (!Objects.isNull(userProperties.get("companyUserId"))) {
|
|
|
+ companyUserId = (long) userProperties.get("companyUserId");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
+
|
|
|
+ // 记录连接信息 管理员不记录
|
|
|
+ if (userType == 0) {
|
|
|
+ FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
+ if (Objects.isNull(fsUser)) {
|
|
|
+ throw new BaseException("用户信息错误");
|
|
|
+ }
|
|
|
+
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId);
|
|
|
+ room.put(userId, session);
|
|
|
+ // 直播间浏览量 +1
|
|
|
+ redisCache.increment(PAGE_VIEWS_KEY + liveId, 1);
|
|
|
+
|
|
|
+ // 累计观看人次 +1
|
|
|
+ redisCache.increment(TOTAL_VIEWS_KEY + liveId, 1);
|
|
|
+
|
|
|
+ // 记录在线人数
|
|
|
+ redisCache.increment(ONLINE_USERS_KEY + liveId, 1);
|
|
|
+ Integer currentOnline = redisCache.getCacheObject(ONLINE_USERS_KEY + liveId);
|
|
|
+ //最大同时在线人数
|
|
|
+ Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
|
|
|
+ if (maxOnline == null || currentOnline > maxOnline) {
|
|
|
+ redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断是否是该直播间的首次访客(独立访客统计)
|
|
|
+ boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + userId, 1, 1, TimeUnit.DAYS);
|
|
|
+ if (isFirstVisit) {
|
|
|
+
|
|
|
+ redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断是否是首次进入直播间的观众
|
|
|
+ boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
|
|
|
+ if (isFirstViewer) {
|
|
|
+ redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
|
|
|
+ }
|
|
|
+ LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(liveId, userId);
|
|
|
+ liveWatchUserVO.setMsgStatus(liveWatchUser.getMsgStatus());
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("entry");
|
|
|
+ sendMsgVo.setMsg("用户进入");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+ // 广播连接消息
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+
|
|
|
+ LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
|
|
|
+ if (liveUserFirstEntry != null) {
|
|
|
+ // 处理第一次自己进入,第二次扫码销售进入
|
|
|
+ if (liveUserFirstEntry.getCompanyUserId() == -1L && companyUserId != -1L) {
|
|
|
+ liveUserFirstEntry.setCompanyId(companyId);
|
|
|
+ liveUserFirstEntry.setCompanyUserId(companyUserId);
|
|
|
+ liveUserFirstEntryService.updateLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 这个用户A邀请用户b,b的业绩算a的销售的
|
|
|
+ if (companyUserId == -2L) {
|
|
|
+ LiveUserFirstEntry clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
|
|
|
+ companyId = clientB.getCompanyId();
|
|
|
+ companyUserId = clientB.getCompanyUserId();
|
|
|
+ }
|
|
|
+ Date date = new Date();
|
|
|
+ liveUserFirstEntry = new LiveUserFirstEntry();
|
|
|
+ liveUserFirstEntry.setUserId(userId);
|
|
|
+ liveUserFirstEntry.setLiveId(liveId);
|
|
|
+ liveUserFirstEntry.setCompanyId(companyId);
|
|
|
+ liveUserFirstEntry.setCompanyUserId(companyUserId);
|
|
|
+ liveUserFirstEntry.setEntryDate(date);
|
|
|
+ liveUserFirstEntry.setFirstEntryTime(date);
|
|
|
+ liveUserFirstEntry.setUpdateTime( date);
|
|
|
+ liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ } else {
|
|
|
+ adminRoom.add(session);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ //关闭连接时调用
|
|
|
+ @OnClose
|
|
|
+ public void onClose(Session session) {
|
|
|
+ Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+
|
|
|
+ long liveId = (long) userProperties.get("liveId");
|
|
|
+ long userId = (long) userProperties.get("userId");
|
|
|
+ long userType = (long) userProperties.get("userType");
|
|
|
+
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
+ if (userType == 0) {
|
|
|
+ FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
|
|
|
+ if (Objects.isNull(fsUser)) {
|
|
|
+ throw new BaseException("用户信息错误");
|
|
|
+ }
|
|
|
+
|
|
|
+ LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
|
|
|
+ room.remove(userId);
|
|
|
+
|
|
|
+ if (room.isEmpty()) {
|
|
|
+ rooms.remove(liveId);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 直播间在线人数 -1
|
|
|
+ redisCache.increment(ONLINE_USERS_KEY + liveId, -1);
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(userType);
|
|
|
+ sendMsgVo.setCmd("out");
|
|
|
+ sendMsgVo.setMsg("用户离开");
|
|
|
+ sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
|
|
|
+ sendMsgVo.setNickName(fsUser.getNickname());
|
|
|
+ sendMsgVo.setAvatar(fsUser.getAvatar());
|
|
|
+
|
|
|
+ // 广播离开消息
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ } else {
|
|
|
+ adminRoom.remove(session);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ //收到客户端信息
|
|
|
+ @OnMessage
|
|
|
+ public void onMessage(Session session,String message) throws IOException {
|
|
|
+ Map<String, Object> userProperties = session.getUserProperties();
|
|
|
+
|
|
|
+ long liveId = (long) userProperties.get("liveId");
|
|
|
+ long userType = (long) userProperties.get("userType");
|
|
|
+
|
|
|
+ SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
|
|
|
+ if(msg.isOn()) return;
|
|
|
+ try {
|
|
|
+ switch (msg.getCmd()) {
|
|
|
+ case "heartbeat":
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ break;
|
|
|
+ case "sendMsg":
|
|
|
+ msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
|
|
|
+ if(StringUtils.isEmpty(msg.getMsg())) return;
|
|
|
+ LiveMsg liveMsg = new LiveMsg();
|
|
|
+ liveMsg.setLiveId(msg.getLiveId());
|
|
|
+ liveMsg.setUserId(msg.getUserId());
|
|
|
+ liveMsg.setNickName(msg.getNickName());
|
|
|
+ liveMsg.setAvatar(msg.getAvatar());
|
|
|
+ liveMsg.setMsg(msg.getMsg());
|
|
|
+ liveMsg.setCreateTime(new Date());
|
|
|
+
|
|
|
+ if (userType == 0) {
|
|
|
+ LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
|
|
|
+ if(liveWatchUser.getMsgStatus() == 1){
|
|
|
+ sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ liveMsgService.insertLiveMsg(liveMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ msg.setOn(true);
|
|
|
+ msg.setData(JSONObject.toJSONString(liveMsg));
|
|
|
+
|
|
|
+ // 广播消息
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ break;
|
|
|
+ case "sendGift":
|
|
|
+ break;
|
|
|
+ case "blockUser":
|
|
|
+ sendBlockMessage(liveId, msg.getUserId());
|
|
|
+ break;
|
|
|
+ case "goods":
|
|
|
+ sendGoodsMessage(msg);
|
|
|
+ break;
|
|
|
+ case "red":
|
|
|
+ processRed(liveId, msg);
|
|
|
+ break;
|
|
|
+ case "lottery":
|
|
|
+ processLottery(liveId, msg);
|
|
|
+ break;
|
|
|
+ case "delAutoTask":
|
|
|
+ if (userType == 1) {
|
|
|
+ delAutoTask(liveId, DateUtils.parseDate(msg.getData(),"yyyy-MM-dd'T'HH:mm:ss.SSSZ").getTime());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("webSocket 消息处理失败 msg: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private void sendGoodsMessage(SendMsgVo msg) {
|
|
|
+ JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
+ Long goodsId = jsonObject.getLong("goodsId");
|
|
|
+ Long liveId = jsonObject.getLong("liveId");
|
|
|
+ Integer status = jsonObject.getInteger("status");
|
|
|
+ msg.setStatus(status);
|
|
|
+ LiveGoodsVo liveGoods = liveGoodsService.selectLiveGoodsVoByGoodsId(goodsId);
|
|
|
+ if(liveGoods == null) return;
|
|
|
+ msg.setLiveId(liveId);
|
|
|
+ msg.setData(JSONObject.toJSONString(liveGoods));
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理红包变动消息
|
|
|
+ */
|
|
|
+ private void processRed(Long liveId, SendMsgVo msg) {
|
|
|
+ log.debug("redData: {}", msg);
|
|
|
+ JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
+ Integer status = jsonObject.getInteger("status");
|
|
|
+ msg.setStatus( status);
|
|
|
+ LiveRedConf liveRedConf = liveRedConfService.selectLiveRedConfByRedId(jsonObject.getLong("redId"));
|
|
|
+ if (Objects.nonNull(liveRedConf)) {
|
|
|
+ msg.setData(JSONObject.toJSONString(liveRedConf));
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理抽奖变动消息
|
|
|
+ */
|
|
|
+ private void processLottery(Long liveId, SendMsgVo msg) {
|
|
|
+ log.debug("lotteryData: {}", msg);
|
|
|
+ JSONObject jsonObject = JSON.parseObject(msg.getData());
|
|
|
+ Integer status = jsonObject.getInteger("status");
|
|
|
+ msg.setStatus( status);
|
|
|
+ LiveLotteryConf liveLotteryConf = liveLotteryConfService.selectLiveLotteryConfByLotteryId(jsonObject.getLong("lotteryId"));
|
|
|
+ if (Objects.nonNull(liveLotteryConf)) {
|
|
|
+ msg.setData(JSONObject.toJSONString(liveLotteryConf));
|
|
|
+ broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //错误时调用
|
|
|
+ @OnError
|
|
|
+ public void onError(Session session, Throwable throwable) {
|
|
|
+ log.error("webSocKet连接错误 msg: {}", throwable.getMessage(), throwable);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取房间
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @return 容器
|
|
|
+ */
|
|
|
+ private ConcurrentHashMap<Long, Session> getRoom(Long liveId) {
|
|
|
+ return rooms.computeIfAbsent(liveId, k -> new ConcurrentHashMap<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取管理端房间
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @return 容器
|
|
|
+ */
|
|
|
+ private List<Session> getAdminRoom(Long liveId) {
|
|
|
+ return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ //发送消息
|
|
|
+ public void sendMessage(Session session, String message) throws IOException {
|
|
|
+ session.getAsyncRemote().sendText(message);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(0L);
|
|
|
+ sendMsgVo.setCmd("Integral");
|
|
|
+ sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
|
|
|
+ sendMsgVo.setData(String.valueOf(scoreAmount));
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if(Objects.isNull( session)) return;
|
|
|
+ session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendBlockMessage(Long liveId, Long userId) {
|
|
|
+ SendMsgVo sendMsgVo = new SendMsgVo();
|
|
|
+ sendMsgVo.setLiveId(liveId);
|
|
|
+ sendMsgVo.setUserId(userId);
|
|
|
+ sendMsgVo.setUserType(0L);
|
|
|
+ sendMsgVo.setCmd("blockUser");
|
|
|
+ sendMsgVo.setMsg("账号已被停用");
|
|
|
+ sendMsgVo.setData(null);
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ Session session = room.get(userId);
|
|
|
+ if(Objects.isNull( session)) return;
|
|
|
+ session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 广播消息
|
|
|
+ * @param liveId 直播间ID
|
|
|
+ * @param message 消息内容
|
|
|
+ */
|
|
|
+ public void broadcastMessage(Long liveId, String message) {
|
|
|
+ ConcurrentHashMap<Long, Session> room = getRoom(liveId);
|
|
|
+ List<Session> adminRoom = getAdminRoom(liveId);
|
|
|
+
|
|
|
+ room.forEach((k, v) -> v.getAsyncRemote().sendText(message));
|
|
|
+ adminRoom.forEach(v -> v.getAsyncRemote().sendText(message));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ *定期将缓存的数据写入数据库
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
+ public void syncLiveDataToDB() {
|
|
|
+ List<LiveData> liveDatas = liveDataService.getAllLiveDatas(); // 获取所有正在直播的直播间数据
|
|
|
+ if(liveDatas == null)
|
|
|
+ return;
|
|
|
+ liveDatas.forEach(liveData ->{
|
|
|
+ liveData.setLikes(
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue("live:like:" + liveData.getLiveId(),0 )).orElse(0L)
|
|
|
+ );
|
|
|
+
|
|
|
+ /* for (Long liveId : liveIds) {
|
|
|
+ LiveData liveData = liveDataService.selectLiveDataByLiveId(liveId);
|
|
|
+ if (liveData == null) {
|
|
|
+ continue; // 防止空指针异常
|
|
|
+ }*/
|
|
|
+
|
|
|
+
|
|
|
+ // 从 redis 获取数据,并提供默认值,避免 NPE
|
|
|
+ liveData.setPageViews(
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue(PAGE_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
|
|
|
+ );
|
|
|
+ liveData.setTotalViews(
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue(TOTAL_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
|
|
|
+ );
|
|
|
+ liveData.setUniqueVisitors(
|
|
|
+ /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VISITORS_KEY + liveId))
|
|
|
+ .map(Set::size) // 获取集合大小
|
|
|
+ .map(Long::valueOf) // 转换为 Long 类型
|
|
|
+ .orElse(0L)*/
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VISITORS_KEY + liveData.getLiveId(),0)).orElse(0L)
|
|
|
+ );
|
|
|
+ liveData.setUniqueViewers(
|
|
|
+ /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VIEWERS_KEY + liveId))
|
|
|
+ .map(Set::size) // 获取集合大小
|
|
|
+ .map(Long::valueOf) // 转换为 Long 类型
|
|
|
+ .orElse(0L)*/
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VIEWERS_KEY + liveData.getLiveId(),0)).orElse(0L)
|
|
|
+ );
|
|
|
+ liveData.setPeakConcurrentViewers(
|
|
|
+ Optional.ofNullable(redisCache.incrementCacheValue(MAX_ONLINE_USERS_KEY + liveData.getLiveId(),0)).orElse(0L)
|
|
|
+ );
|
|
|
+ });
|
|
|
+ if(!liveDatas.isEmpty())
|
|
|
+ for (LiveData liveData : liveDatas) {
|
|
|
+ liveDataService.updateLiveData(liveData);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*// 更新数据库
|
|
|
+ liveDataService.updateLiveData(liveData);*/
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void handleAutoTask(LiveAutoTask task) {
|
|
|
+ if (task.getTaskType() == 1L) {
|
|
|
+ SendMsgVo msg = new SendMsgVo();
|
|
|
+ msg.setLiveId(task.getLiveId());
|
|
|
+ msg.setData(task.getContent());
|
|
|
+ msg.setCmd("goods");
|
|
|
+ try {
|
|
|
+ LiveGoodsVo liveGoodsVo = JSON.parseObject(task.getContent(), LiveGoodsVo.class);
|
|
|
+ liveGoodsService.updateLiveIsShow(liveGoodsVo.getGoodsId(), task.getLiveId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("定时任务执行异常:{}", e.getMessage());
|
|
|
+ }
|
|
|
+ msg.setStatus(1);
|
|
|
+ broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private void delAutoTask(long liveId, Long data) {
|
|
|
+ String key = "live:auto_task:";
|
|
|
+ redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
|
|
|
+ }
|
|
|
+}
|