Ver código fonte

屏蔽掉原来的task 关掉userapp的websocket

yuhongqi 1 semana atrás
pai
commit
7b45f3d0bc

+ 391 - 0
fs-live-socket/src/main/java/com/fs/live/task/Task.java

@@ -0,0 +1,391 @@
+package com.fs.live.task;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fs.common.annotation.QuartzRunnable;
+import com.fs.common.constant.LiveKeysConstant;
+import com.fs.common.core.domain.R;
+import com.fs.common.core.redis.RedisCache;
+import com.fs.common.core.redis.RedisUtil;
+import com.fs.common.utils.StringUtils;
+import com.fs.core.aspectj.lock.DistributeLock;
+import com.fs.erp.domain.ErpDeliverys;
+import com.fs.erp.domain.ErpOrderQuery;
+import com.fs.erp.dto.ErpOrderQueryRequert;
+import com.fs.erp.dto.ErpOrderQueryResponse;
+import com.fs.erp.service.FsJstAftersalePushService;
+import com.fs.erp.service.IErpOrderService;
+import com.fs.erp.utils.ErpContextHolder;
+import com.fs.live.domain.*;
+import com.fs.live.mapper.LiveLotteryRegistrationMapper;
+import com.fs.live.param.LiveReplayParam;
+import com.fs.live.service.*;
+import com.fs.live.vo.LiveConfigVo;
+import com.fs.live.vo.LiveLotteryConfVo;
+import com.fs.live.vo.LiveLotteryProductListVo;
+import com.fs.live.vo.LotteryVo;
+import com.fs.live.websocket.bean.SendMsgVo;
+import com.fs.live.websocket.service.WebSocketServer;
+import com.fs.store.domain.FsExpress;
+import com.fs.store.enums.OrderLogEnum;
+import com.fs.store.mapper.FsWarehousesMapper;
+import com.fs.store.param.LiveAfterSalesAudit1Param;
+import com.fs.store.service.IFsExpressService;
+import com.fs.store.service.IFsUserService;
+import lombok.AllArgsConstructor;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Component
+@AllArgsConstructor
+public class Task {
+
+    private static final Logger log = LoggerFactory.getLogger(Task.class);
+    private final ILiveService liveService;
+
+    private final ILiveDataService liveDataService;
+
+    private final RedisCache redisCache;
+
+    @Autowired
+    private ILiveWatchUserService liveWatchUserService;
+    @Autowired
+    private IFsUserService fsUserService;
+    @Autowired
+    private ILiveRewardRecordService liveRewardRecordService;
+    @Autowired
+    private WebSocketServer webSocketServer;
+    @Autowired
+    private ILiveAutoTaskService liveAutoTaskService;
+    @Autowired
+    private ILiveLotteryConfService liveLotteryConfService;
+    @Autowired
+    private ILiveUserLotteryRecordService liveUserLotteryRecordService;
+    @Autowired
+    private LiveLotteryRegistrationMapper liveLotteryRegistrationMapper;
+    @Autowired
+    private ILiveRedConfService liveRedConfService;
+//1111
+    @Autowired
+    private ILiveOrderService liveOrderService;
+
+    @Autowired
+    private ILiveAfterSalesService afterSalesService;
+
+
+    @Autowired
+    private IErpOrderService erpOrderService;
+
+
+    @Autowired
+    private IFsExpressService expressService;
+
+
+    @Autowired
+    private ILiveOrderLogsService orderLogsService;
+
+
+    @Autowired
+    private FsWarehousesMapper fsWarehousesMapper;
+
+    @Autowired
+    public FsJstAftersalePushService fsJstAftersalePushService;
+    @Autowired
+    public RedisUtil redisUtil;
+    @Scheduled(cron = "0 0/1 * * * ?")
+    @DistributeLock(key = "updateLiveStatusByTime", scene = "task")
+    //public void selectSopUserLogsListByTime() {
+    public void updateLiveStatusByTime() {
+        List<Live> list = liveService.selectNoEndLiveList();
+        if (list.isEmpty())
+            return;
+        List<Long> liveIdLists = list.stream().map(Live::getLiveId).collect(Collectors.toList());
+        List<LiveAutoTask> liveAutoTasks = liveAutoTaskService.selectLiveAutoTaskByLiveIds(liveIdLists);
+        List<Live> activeLiveList = new ArrayList<>();
+        LocalDateTime now = LocalDateTime.now();
+        list.forEach(live -> {
+            if (live.getLiveType() != 3) {
+                if (live.getFinishTime() == null) {
+                    if (now.isAfter(live.getStartTime().minusSeconds(2L))){
+                        live.setStatus(2);
+                        activeLiveList.add( live);
+                    } else if (now.isBefore(live.getStartTime())) {
+                        live.setStatus(1);
+                    }
+                } else {
+                    if (now.isAfter(live.getStartTime().minusSeconds(2L)) && now.isBefore(live.getFinishTime())) {
+                        live.setStatus(2);
+                        activeLiveList.add( live);
+                    } else if (now.isBefore(live.getStartTime().minusSeconds(2L))) {
+                        live.setStatus(1);
+                    } else if (now.isAfter(live.getFinishTime().minusSeconds(2L))) {
+                        live.setStatus(3);
+                    }
+                }
+            } else {
+                // 直播回放只需要检测结束时间就好了
+                LiveReplayParam liveReplayParam = JSON.parseObject(live.getLiveConfig(), LiveReplayParam.class);
+                if (liveReplayParam.getIsPlaybackOpen()) {
+                    if (liveReplayParam.getFinishTime() != null) {
+                        if (now.isAfter(live.getFinishTime().minusSeconds(2L))) {
+                            live.setStatus(3);
+                        }
+                    }
+                }
+            }
+
+        });
+        String key = "live:auto_task:";
+        if(!activeLiveList.isEmpty()){
+            activeLiveList.forEach(live -> {
+                List<LiveAutoTask> collect = liveAutoTasks.stream().filter(liveAutoTask -> liveAutoTask.getLiveId().equals(live.getLiveId())).collect(Collectors.toList());
+                if (!collect.isEmpty()) {
+                    collect.forEach(liveAutoTask -> {
+                        liveAutoTask.setCreateTime(null);
+                        liveAutoTask.setUpdateTime(null);
+                        redisCache.redisTemplate.opsForZSet().add(key + live.getLiveId(), JSON.toJSONString(liveAutoTask),liveAutoTask.getAbsValue().getTime());
+                        redisCache.redisTemplate.expire(key+live.getLiveId(), 30, java.util.concurrent.TimeUnit.MINUTES);
+                    });
+                }
+            });
+        }
+
+        if(!list.isEmpty()){
+            for (Live live : list) {
+                liveService.updateLive(live);
+            }
+        }
+    }
+    @Scheduled(cron = "0/1 * * * * ?")
+    @DistributeLock(key = "liveLotteryTask", scene = "task")
+    public void liveLotteryTask() {
+        long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
+        String lotteryKey = "live:lottery_task:*";
+        Set<String> allLiveKeys = redisCache.redisTemplate.keys(lotteryKey);
+        if (allLiveKeys != null && !allLiveKeys.isEmpty()) {
+            for (String liveKey : allLiveKeys) {
+                Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
+                if (range == null || range.isEmpty()) {
+                    continue;
+                }
+                processLotteryTask(range);
+                redisCache.redisTemplate.opsForZSet()
+                        .removeRangeByScore(liveKey, 0, currentTime);
+            }
+        }
+
+        String redKey = "live:red_task:*";
+        allLiveKeys = redisCache.redisTemplate.keys(redKey);
+        if (allLiveKeys == null || allLiveKeys.isEmpty()) {
+            return;
+        }
+        for (String liveKey : allLiveKeys) {
+            Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
+            if (range == null || range.isEmpty()) {
+                continue;
+            }
+
+            updateRedStatus(range);
+            redisCache.redisTemplate.opsForZSet()
+                    .removeRangeByScore(liveKey, 0, currentTime);
+            try {
+                // 广播红包关闭消息
+                SendMsgVo sendMsgVo = new SendMsgVo();
+                sendMsgVo.setLiveId(Long.valueOf(liveKey));
+                sendMsgVo.setCmd("red");
+                sendMsgVo.setStatus(-1);
+                webSocketServer.broadcastMessage(Long.valueOf(liveKey), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+            } catch (Exception e) {
+                log.error("更新红包状态异常", e);
+            }
+        }
+    }
+
+    private void updateRedStatus(Set<String> range) {
+
+        liveRedConfService.finishRedStatusBySetIds(range);
+    }
+
+    private void processLotteryTask(Set<String> range) {
+        List<LiveLotteryConfVo> liveLotteries = liveLotteryConfService.selectVoListByLotteryIds(range);
+        if(liveLotteries.isEmpty()) return;
+        Date now = new Date();
+        for (LiveLotteryConfVo liveLottery : liveLotteries) {
+            // 查询抽奖数量
+            List<LiveLotteryProductListVo> products = liveLottery.getProducts();
+            Integer totalLots = products.stream().mapToInt(liveLotteryProductListVo -> Math.toIntExact(liveLotteryProductListVo.getTotalLots())).sum();
+            if(totalLots <= 0) continue;
+            // 先将参与记录插入数据库
+            String hashKey = String.format(LiveKeysConstant.LIVE_HOME_PAGE_CONFIG_DRAW, liveLottery.getLiveId(), liveLottery.getLotteryId());
+            Map<Object, Object> hashEntries = redisUtil.hashEntries(hashKey);
+            List<LiveLotteryRegistration> registrationList = new ArrayList<>();
+            if (CollUtil.isNotEmpty(hashEntries)) {
+                registrationList = hashEntries.values().stream()
+                        .map(value -> JSONUtil.toBean(JSONUtil.parseObj(value), LiveLotteryRegistration.class))
+                        .collect(Collectors.toList());
+                liveLotteryRegistrationMapper.insertLiveLotteryRegistrationBatch(registrationList);
+            }
+
+            // 查询在线用户 并且参与了抽奖的用户
+            List<LiveWatchUser> liveWatchUsers = liveWatchUserService.selectLiveWatchAndRegisterUser(liveLottery.getLiveId(),liveLottery.getLotteryId(), totalLots);
+            if(liveWatchUsers.isEmpty()) continue;
+            LiveLotteryRegistration liveLotteryRegistration;
+            // 收集中奖信息
+            List<LotteryVo> lotteryVos = new ArrayList<>();
+            for (LiveLotteryProductListVo liveLotteryProductListVo : products) {
+                // 随机抽奖一个用户获取奖品
+                Long totalLotsPerProduct = liveLotteryProductListVo.getTotalLots();
+                for (int i = 0; i < totalLotsPerProduct && !liveWatchUsers.isEmpty(); i++) {
+                    // 随机选择一个用户
+                    int randomIndex = new Random().nextInt(liveWatchUsers.size());
+                    LiveWatchUser winningUser = liveWatchUsers.get(randomIndex);
+
+                    // 创建中奖记录
+                    LiveUserLotteryRecord record = new LiveUserLotteryRecord();
+                    record.setLotteryId(liveLottery.getLotteryId());
+                    record.setLiveId(liveLottery.getLiveId());
+                    record.setUserId(winningUser.getUserId());
+                    record.setProductId(liveLotteryProductListVo.getProductId());
+                    record.setCreateTime(new Date());
+
+                    // 保存中奖记录
+                    liveUserLotteryRecordService.insertLiveUserLotteryRecord(record);
+                    liveLotteryRegistration = new LiveLotteryRegistration();
+                    liveLotteryRegistration.setLotteryId(liveLottery.getLotteryId());
+                    liveLotteryRegistration.setLiveId(liveLottery.getLotteryId());
+                    liveLotteryRegistration.setUserId(winningUser.getUserId());
+                    liveLotteryRegistration.setIsWin(1L);
+                    liveLotteryRegistration.setUpdateTime(now);
+                    liveLotteryRegistration.setRizeLevel(liveLotteryProductListVo.getPrizeLevel());
+                    liveLotteryRegistrationMapper.updateLiveLotteryRegistrationNoId(liveLotteryRegistration);
+                    // 从候选列表中移除该用户,确保每人只能中奖一次
+                    liveWatchUsers.remove(randomIndex);
+                    LotteryVo lotteryVo = new LotteryVo();
+                    lotteryVo.setUserId(winningUser.getUserId());
+                    lotteryVo.setUserName(winningUser.getNickName());
+                    lotteryVo.setPrizeLevel(liveLotteryProductListVo.getPrizeLevel());
+                    lotteryVo.setProductName(liveLotteryProductListVo.getProductName());
+                    lotteryVos.add(lotteryVo);
+                }
+            }
+            SendMsgVo sendMsgVo = new SendMsgVo();
+            sendMsgVo.setLiveId(liveLottery.getLiveId());
+            sendMsgVo.setCmd("LotteryDetail");
+            sendMsgVo.setData(JSON.toJSONString(lotteryVos));
+            webSocketServer.broadcastMessage(liveLottery.getLiveId(), JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+
+            // 删除缓存 同步抽奖记录
+            redisUtil.delete(hashKey);
+        }
+
+        List<Long> collect = liveLotteries.stream().map(LiveLotteryConfVo::getLotteryId).collect(Collectors.toList());
+        liveLotteryConfService.finishStatusByLotteryIds(collect);
+    }
+
+    @Scheduled(cron = "0/1 * * * * ?")
+    @DistributeLock(key = "liveAutoTask", scene = "task")
+    public void liveAutoTask() {
+        long currentTime = Instant.now().toEpochMilli(); // 当前时间戳(毫秒)
+
+        Set<String> allLiveKeys = redisCache.redisTemplate.keys("live:auto_task:*");
+        if (allLiveKeys == null || allLiveKeys.isEmpty()) {
+            return; // 没有数据,直接返回
+        }
+        // 2. 遍历每个直播间的ZSet键
+        for (String liveKey : allLiveKeys) {
+            // 3. 获取当前直播间ZSet中所有元素(按score排序)
+            // range方法:0表示第一个元素,-1表示最后一个元素,即获取全部
+            Set<String> range = redisCache.redisTemplate.opsForZSet().rangeByScore(liveKey, 0, currentTime);
+            if (range == null || range.isEmpty()) {
+                continue; // 没有数据,直接返回
+            }
+            redisCache.redisTemplate.opsForZSet()
+                    .removeRangeByScore(liveKey, 0, currentTime);
+            processAutoTask(range);
+        }
+    }
+
+    private void processAutoTask(Set<String> range) {
+        for (String liveAutoTask : range) {
+            LiveAutoTask task = JSON.parseObject(liveAutoTask, LiveAutoTask.class);
+            webSocketServer.handleAutoTask(task);
+            task.setFinishStatus(1L);
+            liveAutoTaskService.finishLiveAutoTask(task);
+        }
+    }
+
+    @Scheduled(cron = "0 0/1 * * * ?")
+    @DistributeLock(key = "autoUpdateWatchReward", scene = "task")
+    @Transactional
+    public void autoUpdateWatchReward() {
+
+        // 1.查询所有直播中的直播间
+        List<Live> lives = liveService.liveList();
+
+
+        // 2.检查是否开启观看奖励
+        List<Live> openRewardLives = lives.stream().filter(live -> StringUtils.isNotEmpty(live.getConfigJson())).collect(Collectors.toList());
+        Date now = new Date();
+
+        for (Live openRewardLive : openRewardLives) {
+            String configJson = openRewardLive.getConfigJson();
+            LiveWatchConfig config = JSON.parseObject(configJson, LiveWatchConfig.class);
+            if (config.getEnabled()) {
+                // 3.检查当前直播间的在线用户(可以传入一个时间,然后查出来当天没领取奖励的用户)
+                List<LiveWatchUser> onlineUser = liveWatchUserService.checkOnlineNoRewardUser(openRewardLive.getLiveId(), now)
+                        .stream().filter(user -> now.getTime() - user.getUpdateTime().getTime() > config.getWatchDuration() * 60 * 1000)
+                        .collect(Collectors.toList());
+                if(onlineUser.isEmpty()) continue;
+
+                List<Long> userIds = onlineUser.stream().map(LiveWatchUser::getUserId).collect(Collectors.toList());
+                // 4.保存用户领取记录
+                saveUserRewardRecord(openRewardLive, userIds,config.getScoreAmount());
+                // 5.更新用户积分(芳华币
+                fsUserService.incrIntegral(userIds,config.getScoreAmount());
+                // 6.发送websocket事件消息 通知用户自动领取成功
+                userIds.forEach(userId -> webSocketServer.sendIntegralMessage(openRewardLive.getLiveId(),userId,config.getScoreAmount()));
+
+            }
+        }
+    }
+    private void saveUserRewardRecord(Live live, List<Long> userIds,Long scoreAmount) {
+        for (Long userId : userIds) {
+            LiveRewardRecord record = new LiveRewardRecord();
+            record.setLiveId(live.getLiveId());
+            record.setUserId(userId);
+            record.setIncomeType(1L);
+            record.setSourceType(1L);
+            record.setSourceId(live.getCompanyId() == null ? 0L : live.getCompanyId());
+            record.setRewardType(2L);
+            record.setNum(BigDecimal.valueOf(scoreAmount));
+            record.setRewardType(2L);
+            record.setCreateTime(new Date());
+            record.setCreateBy(String.valueOf(userId));
+            liveRewardRecordService.insertLiveRewardRecord(record);
+        }
+    }
+
+    /**
+     * 更新红包领取数量
+     */
+    @Scheduled(cron = "0/5 * * * * ?")
+    @DistributeLock(key = "updateRedQuantityNum", scene = "task")
+    public void updateRedQuantityNum() {
+        liveRedConfService.updateRedQuantityNum();
+    }
+}

+ 68 - 70
fs-user-app/src/main/java/com/fs/app/controller/LiveController.java

@@ -6,7 +6,6 @@ import com.fs.app.facade.LiveFacadeService;
 import com.fs.app.vo.LiveInfoVo;
 import com.fs.common.vo.LiveVo;
 import com.fs.app.websocket.bean.SendMsgVo;
-import com.fs.app.websocket.service.WebSocketServer;
 import com.fs.common.annotation.RepeatSubmit;
 import com.fs.common.core.domain.BaseEntity;
 import com.fs.common.core.domain.R;
@@ -54,8 +53,7 @@ public class LiveController extends AppBaseController {
 	private final ILiveService liveService;
 	private final ILiveMsgService liveMsgService;
 
-	@Autowired
-	private WebSocketServer webSocketServer;
+
 
 	@Autowired
 	private ILiveGoodsService liveGoodsService;
@@ -172,73 +170,73 @@ public class LiveController extends AppBaseController {
 	}
 
 
-	// 直播开播回调
-	@PostMapping("/startLiving")
-	public R checkLiving(HttpServletRequest request, @RequestBody  Map<String, String> params) {
-		log.info("请求参数:{}", params);
-		SendMsgVo sendMsgVo = new SendMsgVo();
-		sendMsgVo.setMsg("开始直播");
-		sendMsgVo.setCmd("live_start");
-		webSocketServer.broadcastMessage(Long.valueOf(params.get("stream_id")), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
-		return R.ok();
-//		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live, channel_id=667, errcode=0,
-//		errmsg=, event_time=1753840982, event_type=1,
-//		height=1080, idc_id=38, node=113.249.151.16,
-//		sequence=702115889072680959, set_id=2, stream_id=667,
-//		stream_param=txSecret=6E89AECCA08DBEE82E7F5870BCED7132&txTime=688ACD21, user_ip=125.80.218.101, width=1920}
-	}
-
-	// 直播结束回调
-	@PostMapping("/endLiving")
-	public R endLiving(HttpServletRequest request, @RequestBody  Map<String, String> params) {
-		log.info("请求参数:{}", params);
-		SendMsgVo sendMsgVo = new SendMsgVo();
-		sendMsgVo.setMsg("结束直播");
-		sendMsgVo.setCmd("live_end");
-		webSocketServer.broadcastMessage(Long.valueOf(params.get("stream_id")), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
-		Live live = new Live();
-		live.setLiveId(Long.valueOf(params.get("stream_id")));
-		live.setStatus(3);
-		live.setFinishTime(LocalDateTime.now());
-		liveService.updateLive(live);
-		return R.ok();
-//		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live, channel_id=673,
-//				errcode=1, errmsg=The push client actively stopped the push, event_time=1755571239,
-//				event_type=0, height=1080, idc_id=38, node=113.250.23.118, push_duration=1051237,
-//				sequence=721865018844564968, set_id=2, stream_id=673,
-//				stream_param=txSecret=A3EF362C9484D3D091C2E9B08C2C08CB&txTime=68A53145,
-//				user_ip=113.248.98.28, width=1920}
-
-	}
-
-	// 直播录制文件回调 腾讯云 cos桶 fs
-	@PostMapping("/liveReplayFile")
-	public R liveReplayFile(HttpServletRequest request, @RequestBody  Map<String, String> params) {
-		log.info("请求参数:{}", params);
-		Long liveId = Long.valueOf(params.get("stream_id"));
-		LiveVideo liveVideo = new LiveVideo();
-		LiveVideo exist = videoService.selectLiveVideoByLiveId(liveId);
-		if (exist != null) {
-			liveVideo = exist;
-		}
-		Date nowDate = DateUtils.getNowDate();
-		liveVideo.setLiveId(liveId);
-		liveVideo.setVideoUrl(params.get("video_url"));
-		liveVideo.setVideoType(2);
-		liveVideo.setDuration(Long.valueOf(params.get("duration")));
-		liveVideo.setCreateTime(nowDate);
-		liveVideo.setUpdateTime(nowDate);
-		videoService.saveOrUpdate(liveVideo);
-		return R.ok();
-//		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live,
-//				callback_ext={"video_codec":"h264","session_id":"1755326625589574166","resolution":"1920x1080"},
-//			channel_id=774, duration=137, end_time=1757387736, end_time_usec=476103, event_type=100, file_format=mp4,
-//					file_id=1319721001_d69054948de44655b465aaf725cafc1c, file_size=45806289, media_start_time=80, record_bps=0,
-//				record_file_id=1319721001_d69054948de44655b465aaf725cafc1c, record_temp_id=1595756, start_time=1757387600, start_time_usec=942579,
-//				stream_id=774, stream_param=txSecret=CBF1E86FB1AD58B9CC25941F6B9DA854&txTime=68C0E304, task_id=1755326625589574166, video_id=1319721001_05396a7c47914b40aa9ffefcb0ea4626,
-//				video_url=http://fs-1319721001.cos.ap-chongqing.myqcloud.com/origin/200149.push.tlivecloud.com/live/774/1755326625589574166-6a9899ea95be4e89be0eeb30cf458579/2025-09-09-11-13-20.mp4}
-
-	}
+//	// 直播开播回调
+//	@PostMapping("/startLiving")
+//	public R checkLiving(HttpServletRequest request, @RequestBody  Map<String, String> params) {
+//		log.info("请求参数:{}", params);
+//		SendMsgVo sendMsgVo = new SendMsgVo();
+//		sendMsgVo.setMsg("开始直播");
+//		sendMsgVo.setCmd("live_start");
+//		webSocketServer.broadcastMessage(Long.valueOf(params.get("stream_id")), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
+//		return R.ok();
+////		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live, channel_id=667, errcode=0,
+////		errmsg=, event_time=1753840982, event_type=1,
+////		height=1080, idc_id=38, node=113.249.151.16,
+////		sequence=702115889072680959, set_id=2, stream_id=667,
+////		stream_param=txSecret=6E89AECCA08DBEE82E7F5870BCED7132&txTime=688ACD21, user_ip=125.80.218.101, width=1920}
+//	}
+//
+//	// 直播结束回调
+//	@PostMapping("/endLiving")
+//	public R endLiving(HttpServletRequest request, @RequestBody  Map<String, String> params) {
+//		log.info("请求参数:{}", params);
+//		SendMsgVo sendMsgVo = new SendMsgVo();
+//		sendMsgVo.setMsg("结束直播");
+//		sendMsgVo.setCmd("live_end");
+//		webSocketServer.broadcastMessage(Long.valueOf(params.get("stream_id")), JSONObject.toJSONString(R.ok().put("data",sendMsgVo)));
+//		Live live = new Live();
+//		live.setLiveId(Long.valueOf(params.get("stream_id")));
+//		live.setStatus(3);
+//		live.setFinishTime(LocalDateTime.now());
+//		liveService.updateLive(live);
+//		return R.ok();
+////		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live, channel_id=673,
+////				errcode=1, errmsg=The push client actively stopped the push, event_time=1755571239,
+////				event_type=0, height=1080, idc_id=38, node=113.250.23.118, push_duration=1051237,
+////				sequence=721865018844564968, set_id=2, stream_id=673,
+////				stream_param=txSecret=A3EF362C9484D3D091C2E9B08C2C08CB&txTime=68A53145,
+////				user_ip=113.248.98.28, width=1920}
+//
+//	}
+//
+//	// 直播录制文件回调 腾讯云 cos桶 fs
+//	@PostMapping("/liveReplayFile")
+//	public R liveReplayFile(HttpServletRequest request, @RequestBody  Map<String, String> params) {
+//		log.info("请求参数:{}", params);
+//		Long liveId = Long.valueOf(params.get("stream_id"));
+//		LiveVideo liveVideo = new LiveVideo();
+//		LiveVideo exist = videoService.selectLiveVideoByLiveId(liveId);
+//		if (exist != null) {
+//			liveVideo = exist;
+//		}
+//		Date nowDate = DateUtils.getNowDate();
+//		liveVideo.setLiveId(liveId);
+//		liveVideo.setVideoUrl(params.get("video_url"));
+//		liveVideo.setVideoType(2);
+//		liveVideo.setDuration(Long.valueOf(params.get("duration")));
+//		liveVideo.setCreateTime(nowDate);
+//		liveVideo.setUpdateTime(nowDate);
+//		videoService.saveOrUpdate(liveVideo);
+//		return R.ok();
+////		{app=200149.push.tlivecloud.com, appid=1319721001, appname=live,
+////				callback_ext={"video_codec":"h264","session_id":"1755326625589574166","resolution":"1920x1080"},
+////			channel_id=774, duration=137, end_time=1757387736, end_time_usec=476103, event_type=100, file_format=mp4,
+////					file_id=1319721001_d69054948de44655b465aaf725cafc1c, file_size=45806289, media_start_time=80, record_bps=0,
+////				record_file_id=1319721001_d69054948de44655b465aaf725cafc1c, record_temp_id=1595756, start_time=1757387600, start_time_usec=942579,
+////				stream_id=774, stream_param=txSecret=CBF1E86FB1AD58B9CC25941F6B9DA854&txTime=68C0E304, task_id=1755326625589574166, video_id=1319721001_05396a7c47914b40aa9ffefcb0ea4626,
+////				video_url=http://fs-1319721001.cos.ap-chongqing.myqcloud.com/origin/200149.push.tlivecloud.com/live/774/1755326625589574166-6a9899ea95be4e89be0eeb30cf458579/2025-09-09-11-13-20.mp4}
+//
+//	}
 	@GetMapping("/currentActivities")
 	@Transactional
 	@Login

+ 471 - 471
fs-user-app/src/main/java/com/fs/app/websocket/service/WebSocketServer.java

@@ -1,471 +1,471 @@
-package com.fs.app.websocket.service;
-
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.fs.app.config.ProductionWordFilter;
-import com.fs.app.websocket.auth.WebSocketConfigurator;
-import com.fs.app.websocket.bean.SendMsgVo;
-import com.fs.common.core.domain.R;
-import com.fs.common.core.redis.RedisCache;
-import com.fs.common.exception.BaseException;
-import com.fs.common.utils.StringUtils;
-import com.fs.common.utils.spring.SpringUtils;
-import com.fs.live.domain.*;
-import com.fs.live.service.*;
-import com.fs.live.vo.LiveGoodsVo;
-import com.fs.store.domain.FsUser;
-import com.fs.store.service.IFsUserService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.time.DateUtils;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import javax.websocket.*;
-import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import static com.fs.common.constant.LiveKeysConstant.*;
-
-@ServerEndpoint(value = "/app/webSocket",configurator = WebSocketConfigurator.class)
-@Component
-@Slf4j
-public class WebSocketServer {
-
-    // 直播间用户session
-    private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
-    // 管理端连接
-    private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
-    private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
-    private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
-    private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
-    private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
-    private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
-    private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
-    private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
-    private final ILiveRedConfService liveRedConfService =  SpringUtils.getBean(ILiveRedConfService.class);
-    private final ILiveLotteryConfService liveLotteryConfService =  SpringUtils.getBean(ILiveLotteryConfService.class);
-    private final ILiveGoodsService liveGoodsService =  SpringUtils.getBean(ILiveGoodsService.class);
-    private final ILiveUserFirstEntryService liveUserFirstEntryService =  SpringUtils.getBean(ILiveUserFirstEntryService.class);
-    // 直播间在线用户缓存
-//    private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
-
-
-    //建立连接成功调用
-    @OnOpen
-    public void onOpen(Session session) {
-
-        Map<String, Object> userProperties = session.getUserProperties();
-        long liveId = (long) userProperties.get("liveId");
-        long userId = (long) userProperties.get("userId");
-        long userType = (long) userProperties.get("userType");
-        Live live = liveService.selectLiveByLiveId(liveId);
-        if (live == null) {
-            throw new BaseException("未找到直播间");
-        }
-        long companyId = live.getCompanyId() == null ? -1L : live.getCompanyId();
-        long companyUserId = -1L;
-        if (!Objects.isNull(userProperties.get("companyId"))) {
-            companyId = (long) userProperties.get("companyId");
-        }
-        if (!Objects.isNull(userProperties.get("companyUserId"))) {
-            companyUserId = (long) userProperties.get("companyUserId");
-        }
-
-
-        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        List<Session> adminRoom = getAdminRoom(liveId);
-
-        // 记录连接信息 管理员不记录
-        if (userType == 0) {
-            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
-            if (Objects.isNull(fsUser)) {
-                throw new BaseException("用户信息错误");
-            }
-
-            LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId);
-            room.put(userId, session);
-            // 直播间浏览量 +1
-            redisCache.increment(PAGE_VIEWS_KEY + liveId, 1);
-
-            // 累计观看人次 +1
-            redisCache.increment(TOTAL_VIEWS_KEY + liveId, 1);
-
-            // 记录在线人数
-            redisCache.increment(ONLINE_USERS_KEY + liveId, 1);
-            Integer currentOnline = redisCache.getCacheObject(ONLINE_USERS_KEY + liveId);
-            //最大同时在线人数
-            Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
-            if (maxOnline == null || currentOnline > maxOnline) {
-                redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
-            }
-
-            // 判断是否是该直播间的首次访客(独立访客统计)
-            boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + userId, 1, 1, TimeUnit.DAYS);
-            if (isFirstVisit) {
-
-                redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
-            }
-
-            // 判断是否是首次进入直播间的观众
-            boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
-            if (isFirstViewer) {
-                redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
-            }
-            LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(liveId, userId);
-            liveWatchUserVO.setMsgStatus(liveWatchUser.getMsgStatus());
-            SendMsgVo sendMsgVo = new SendMsgVo();
-            sendMsgVo.setLiveId(liveId);
-            sendMsgVo.setUserId(userId);
-            sendMsgVo.setUserType(userType);
-            sendMsgVo.setCmd("entry");
-            sendMsgVo.setMsg("用户进入");
-            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
-            sendMsgVo.setNickName(fsUser.getNickname());
-            sendMsgVo.setAvatar(fsUser.getAvatar());
-            // 广播连接消息
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
-
-            LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
-            if (liveUserFirstEntry != null) {
-                // 处理第一次自己进入,第二次扫码销售进入
-                if (liveUserFirstEntry.getCompanyUserId() == -1L && companyUserId != -1L) {
-                    liveUserFirstEntry.setCompanyId(companyId);
-                    liveUserFirstEntry.setCompanyUserId(companyUserId);
-                    liveUserFirstEntryService.updateLiveUserFirstEntry(liveUserFirstEntry);
-                }
-            } else {
-                // 这个用户A邀请用户b,b的业绩算a的销售的
-                if (companyUserId == -2L) {
-                    LiveUserFirstEntry clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
-                    companyId = clientB.getCompanyId();
-                    companyUserId = clientB.getCompanyUserId();
-                }
-                Date date = new Date();
-                liveUserFirstEntry = new LiveUserFirstEntry();
-                liveUserFirstEntry.setUserId(userId);
-                liveUserFirstEntry.setLiveId(liveId);
-                liveUserFirstEntry.setCompanyId(companyId);
-                liveUserFirstEntry.setCompanyUserId(companyUserId);
-                liveUserFirstEntry.setEntryDate(date);
-                liveUserFirstEntry.setFirstEntryTime(date);
-                liveUserFirstEntry.setUpdateTime( date);
-                liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
-            }
-
-
-        } else {
-            adminRoom.add(session);
-        }
-
-        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
-    }
-
-    //关闭连接时调用
-    @OnClose
-    public void onClose(Session session) {
-        Map<String, Object> userProperties = session.getUserProperties();
-
-        long liveId = (long) userProperties.get("liveId");
-        long userId = (long) userProperties.get("userId");
-        long userType = (long) userProperties.get("userType");
-
-        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        List<Session> adminRoom = getAdminRoom(liveId);
-        if (userType == 0) {
-            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
-            if (Objects.isNull(fsUser)) {
-                throw new BaseException("用户信息错误");
-            }
-
-            LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
-            room.remove(userId);
-
-            if (room.isEmpty()) {
-                rooms.remove(liveId);
-            }
-
-
-            // 直播间在线人数 -1
-            redisCache.increment(ONLINE_USERS_KEY + liveId, -1);
-            SendMsgVo sendMsgVo = new SendMsgVo();
-            sendMsgVo.setLiveId(liveId);
-            sendMsgVo.setUserId(userId);
-            sendMsgVo.setUserType(userType);
-            sendMsgVo.setCmd("out");
-            sendMsgVo.setMsg("用户离开");
-            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
-            sendMsgVo.setNickName(fsUser.getNickname());
-            sendMsgVo.setAvatar(fsUser.getAvatar());
-
-            // 广播离开消息
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
-        } else {
-            adminRoom.remove(session);
-        }
-
-        log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
-    }
-
-    //收到客户端信息
-    @OnMessage
-    public void onMessage(Session session,String message) throws IOException {
-        Map<String, Object> userProperties = session.getUserProperties();
-
-        long liveId = (long) userProperties.get("liveId");
-        long userType = (long) userProperties.get("userType");
-
-        SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
-        if(msg.isOn()) return;
-        try {
-            switch (msg.getCmd()) {
-                case "heartbeat":
-                    sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
-                    break;
-                case "sendMsg":
-                    msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
-                    if(StringUtils.isEmpty(msg.getMsg())) return;
-                    LiveMsg liveMsg = new LiveMsg();
-                    liveMsg.setLiveId(msg.getLiveId());
-                    liveMsg.setUserId(msg.getUserId());
-                    liveMsg.setNickName(msg.getNickName());
-                    liveMsg.setAvatar(msg.getAvatar());
-                    liveMsg.setMsg(msg.getMsg());
-                    liveMsg.setCreateTime(new Date());
-
-                    if (userType == 0) {
-                        LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
-                        if(liveWatchUser.getMsgStatus() == 1){
-                            sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
-                            return;
-                        }
-
-                        liveMsgService.insertLiveMsg(liveMsg);
-                    }
-
-                    msg.setOn(true);
-                    msg.setData(JSONObject.toJSONString(liveMsg));
-
-                    // 广播消息
-                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
-                    break;
-                case "sendGift":
-                    break;
-                case "blockUser":
-                    sendBlockMessage(liveId, msg.getUserId());
-                    break;
-                case "goods":
-                    sendGoodsMessage(msg);
-                    break;
-                case "red":
-                    processRed(liveId, msg);
-                    break;
-                case "lottery":
-                    processLottery(liveId, msg);
-                    break;
-                case "delAutoTask":
-                    if (userType == 1) {
-                        delAutoTask(liveId, DateUtils.parseDate(msg.getData(),"yyyy-MM-dd'T'HH:mm:ss.SSSZ").getTime());
-                    }
-                    break;
-            }
-        } catch (Exception e) {
-            log.error("webSocket 消息处理失败 msg: {}", e.getMessage(), e);
-        }
-    }
-
-
-
-    private void sendGoodsMessage(SendMsgVo msg) {
-        JSONObject jsonObject = JSON.parseObject(msg.getData());
-        Long goodsId = jsonObject.getLong("goodsId");
-        Long liveId = jsonObject.getLong("liveId");
-        Integer status = jsonObject.getInteger("status");
-        msg.setStatus(status);
-        LiveGoodsVo liveGoods = liveGoodsService.selectLiveGoodsVoByGoodsId(goodsId);
-        if(liveGoods == null) return;
-        msg.setLiveId(liveId);
-        msg.setData(JSONObject.toJSONString(liveGoods));
-        broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
-    }
-
-    /**
-     * 处理红包变动消息
-     */
-    private void processRed(Long liveId, SendMsgVo msg) {
-        log.debug("redData: {}", msg);
-        JSONObject jsonObject = JSON.parseObject(msg.getData());
-        Integer status = jsonObject.getInteger("status");
-        msg.setStatus( status);
-        LiveRedConf liveRedConf = liveRedConfService.selectLiveRedConfByRedId(jsonObject.getLong("redId"));
-        if (Objects.nonNull(liveRedConf)) {
-            msg.setData(JSONObject.toJSONString(liveRedConf));
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
-        }
-    }
-
-    /**
-     * 处理抽奖变动消息
-     */
-    private void processLottery(Long liveId, SendMsgVo msg) {
-        log.debug("lotteryData: {}", msg);
-        JSONObject jsonObject = JSON.parseObject(msg.getData());
-        Integer status = jsonObject.getInteger("status");
-        msg.setStatus( status);
-        LiveLotteryConf liveLotteryConf = liveLotteryConfService.selectLiveLotteryConfByLotteryId(jsonObject.getLong("lotteryId"));
-        if (Objects.nonNull(liveLotteryConf)) {
-            msg.setData(JSONObject.toJSONString(liveLotteryConf));
-            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
-        }
-    }
-
-    //错误时调用
-    @OnError
-    public void onError(Session session, Throwable throwable) {
-        log.error("webSocKet连接错误 msg: {}", throwable.getMessage(), throwable);
-    }
-
-    /**
-     * 获取房间
-     * @param liveId 直播间ID
-     * @return 容器
-     */
-    private ConcurrentHashMap<Long, Session> getRoom(Long liveId) {
-        return rooms.computeIfAbsent(liveId, k -> new ConcurrentHashMap<>());
-    }
-
-    /**
-     * 获取管理端房间
-     * @param liveId  直播间ID
-     * @return  容器
-     */
-    private List<Session> getAdminRoom(Long liveId) {
-        return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
-    }
-
-    //发送消息
-    public void sendMessage(Session session, String message) throws IOException {
-        session.getAsyncRemote().sendText(message);
-    }
-
-    public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
-        SendMsgVo sendMsgVo = new SendMsgVo();
-        sendMsgVo.setLiveId(liveId);
-        sendMsgVo.setUserId(userId);
-        sendMsgVo.setUserType(0L);
-        sendMsgVo.setCmd("Integral");
-        sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
-        sendMsgVo.setData(String.valueOf(scoreAmount));
-        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        Session session = room.get(userId);
-        if(Objects.isNull( session)) return;
-        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
-    }
-
-    private void sendBlockMessage(Long liveId, Long userId) {
-        SendMsgVo sendMsgVo = new SendMsgVo();
-        sendMsgVo.setLiveId(liveId);
-        sendMsgVo.setUserId(userId);
-        sendMsgVo.setUserType(0L);
-        sendMsgVo.setCmd("blockUser");
-        sendMsgVo.setMsg("账号已被停用");
-        sendMsgVo.setData(null);
-        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        Session session = room.get(userId);
-        if(Objects.isNull( session)) return;
-        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
-    }
-
-    /**
-     * 广播消息
-     * @param liveId   直播间ID
-     * @param message  消息内容
-     */
-    public void broadcastMessage(Long liveId, String message) {
-        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
-        List<Session> adminRoom = getAdminRoom(liveId);
-
-        room.forEach((k, v) -> v.getAsyncRemote().sendText(message));
-        adminRoom.forEach(v -> v.getAsyncRemote().sendText(message));
-    }
-
-    /**
-     *定期将缓存的数据写入数据库
-     */
-    @Scheduled(fixedRate = 60000) // 每分钟执行一次
-    public void syncLiveDataToDB() {
-        List<LiveData> liveDatas = liveDataService.getAllLiveDatas(); // 获取所有正在直播的直播间数据
-        if(liveDatas == null)
-            return;
-        liveDatas.forEach(liveData ->{
-            liveData.setLikes(
-                    Optional.ofNullable(redisCache.incrementCacheValue("live:like:" + liveData.getLiveId(),0 )).orElse(0L)
-            );
-
-       /* for (Long liveId : liveIds) {
-            LiveData liveData = liveDataService.selectLiveDataByLiveId(liveId);
-            if (liveData == null) {
-                continue; // 防止空指针异常
-            }*/
-
-
-            // 从 redis 获取数据,并提供默认值,避免 NPE
-            liveData.setPageViews(
-                    Optional.ofNullable(redisCache.incrementCacheValue(PAGE_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
-            );
-            liveData.setTotalViews(
-                    Optional.ofNullable(redisCache.incrementCacheValue(TOTAL_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
-            );
-            liveData.setUniqueVisitors(
-                    /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VISITORS_KEY + liveId))
-                            .map(Set::size)  // 获取集合大小
-                            .map(Long::valueOf)  // 转换为 Long 类型
-                            .orElse(0L)*/
-                    Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VISITORS_KEY + liveData.getLiveId(),0)).orElse(0L)
-            );
-            liveData.setUniqueViewers(
-                    /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VIEWERS_KEY + liveId))
-                            .map(Set::size)  // 获取集合大小
-                            .map(Long::valueOf)  // 转换为 Long 类型
-                            .orElse(0L)*/
-                    Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VIEWERS_KEY + liveData.getLiveId(),0)).orElse(0L)
-            );
-            liveData.setPeakConcurrentViewers(
-                    Optional.ofNullable(redisCache.incrementCacheValue(MAX_ONLINE_USERS_KEY + liveData.getLiveId(),0)).orElse(0L)
-            );
-        });
-        if(!liveDatas.isEmpty())
-            for (LiveData liveData : liveDatas) {
-                liveDataService.updateLiveData(liveData);
-            }
-
-            /*// 更新数据库
-            liveDataService.updateLiveData(liveData);*/
-    }
-
-
-    public void handleAutoTask(LiveAutoTask task) {
-        if (task.getTaskType() == 1L) {
-            SendMsgVo msg = new SendMsgVo();
-            msg.setLiveId(task.getLiveId());
-            msg.setData(task.getContent());
-            msg.setCmd("goods");
-            try {
-                LiveGoodsVo liveGoodsVo = JSON.parseObject(task.getContent(), LiveGoodsVo.class);
-                liveGoodsService.updateLiveIsShow(liveGoodsVo.getGoodsId(), task.getLiveId());
-            } catch (Exception e) {
-                log.error("定时任务执行异常:{}", e.getMessage());
-            }
-            msg.setStatus(1);
-            broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
-        }
-    }
-    private void delAutoTask(long liveId, Long data) {
-        String key = "live:auto_task:";
-        redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
-    }
-}
+//package com.fs.app.websocket.service;
+//
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONObject;
+//import com.fs.app.config.ProductionWordFilter;
+//import com.fs.app.websocket.auth.WebSocketConfigurator;
+//import com.fs.app.websocket.bean.SendMsgVo;
+//import com.fs.common.core.domain.R;
+//import com.fs.common.core.redis.RedisCache;
+//import com.fs.common.exception.BaseException;
+//import com.fs.common.utils.StringUtils;
+//import com.fs.common.utils.spring.SpringUtils;
+//import com.fs.live.domain.*;
+//import com.fs.live.service.*;
+//import com.fs.live.vo.LiveGoodsVo;
+//import com.fs.store.domain.FsUser;
+//import com.fs.store.service.IFsUserService;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.commons.lang3.time.DateUtils;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import javax.websocket.*;
+//import javax.websocket.server.ServerEndpoint;
+//import java.io.IOException;
+//import java.util.*;
+//import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.CopyOnWriteArrayList;
+//import java.util.concurrent.TimeUnit;
+//
+//import static com.fs.common.constant.LiveKeysConstant.*;
+//
+//@ServerEndpoint(value = "/app/webSocket",configurator = WebSocketConfigurator.class)
+//@Component
+//@Slf4j
+//public class WebSocketServer {
+//
+//    // 直播间用户session
+//    private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
+//    // 管理端连接
+//    private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
+//    private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
+//    private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
+//    private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
+//    private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
+//    private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
+//    private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
+//    private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
+//    private final ILiveRedConfService liveRedConfService =  SpringUtils.getBean(ILiveRedConfService.class);
+//    private final ILiveLotteryConfService liveLotteryConfService =  SpringUtils.getBean(ILiveLotteryConfService.class);
+//    private final ILiveGoodsService liveGoodsService =  SpringUtils.getBean(ILiveGoodsService.class);
+//    private final ILiveUserFirstEntryService liveUserFirstEntryService =  SpringUtils.getBean(ILiveUserFirstEntryService.class);
+//    // 直播间在线用户缓存
+////    private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
+//
+//
+//    //建立连接成功调用
+//    @OnOpen
+//    public void onOpen(Session session) {
+//
+//        Map<String, Object> userProperties = session.getUserProperties();
+//        long liveId = (long) userProperties.get("liveId");
+//        long userId = (long) userProperties.get("userId");
+//        long userType = (long) userProperties.get("userType");
+//        Live live = liveService.selectLiveByLiveId(liveId);
+//        if (live == null) {
+//            throw new BaseException("未找到直播间");
+//        }
+//        long companyId = live.getCompanyId() == null ? -1L : live.getCompanyId();
+//        long companyUserId = -1L;
+//        if (!Objects.isNull(userProperties.get("companyId"))) {
+//            companyId = (long) userProperties.get("companyId");
+//        }
+//        if (!Objects.isNull(userProperties.get("companyUserId"))) {
+//            companyUserId = (long) userProperties.get("companyUserId");
+//        }
+//
+//
+//        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+//        List<Session> adminRoom = getAdminRoom(liveId);
+//
+//        // 记录连接信息 管理员不记录
+//        if (userType == 0) {
+//            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+//            if (Objects.isNull(fsUser)) {
+//                throw new BaseException("用户信息错误");
+//            }
+//
+//            LiveWatchUser liveWatchUserVO = liveWatchUserService.join(liveId, userId);
+//            room.put(userId, session);
+//            // 直播间浏览量 +1
+//            redisCache.increment(PAGE_VIEWS_KEY + liveId, 1);
+//
+//            // 累计观看人次 +1
+//            redisCache.increment(TOTAL_VIEWS_KEY + liveId, 1);
+//
+//            // 记录在线人数
+//            redisCache.increment(ONLINE_USERS_KEY + liveId, 1);
+//            Integer currentOnline = redisCache.getCacheObject(ONLINE_USERS_KEY + liveId);
+//            //最大同时在线人数
+//            Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
+//            if (maxOnline == null || currentOnline > maxOnline) {
+//                redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
+//            }
+//
+//            // 判断是否是该直播间的首次访客(独立访客统计)
+//            boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + userId, 1, 1, TimeUnit.DAYS);
+//            if (isFirstVisit) {
+//
+//                redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
+//            }
+//
+//            // 判断是否是首次进入直播间的观众
+//            boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
+//            if (isFirstViewer) {
+//                redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
+//            }
+//            LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(liveId, userId);
+//            liveWatchUserVO.setMsgStatus(liveWatchUser.getMsgStatus());
+//            SendMsgVo sendMsgVo = new SendMsgVo();
+//            sendMsgVo.setLiveId(liveId);
+//            sendMsgVo.setUserId(userId);
+//            sendMsgVo.setUserType(userType);
+//            sendMsgVo.setCmd("entry");
+//            sendMsgVo.setMsg("用户进入");
+//            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+//            sendMsgVo.setNickName(fsUser.getNickname());
+//            sendMsgVo.setAvatar(fsUser.getAvatar());
+//            // 广播连接消息
+//            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+//
+//            LiveUserFirstEntry liveUserFirstEntry = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, userId);
+//            if (liveUserFirstEntry != null) {
+//                // 处理第一次自己进入,第二次扫码销售进入
+//                if (liveUserFirstEntry.getCompanyUserId() == -1L && companyUserId != -1L) {
+//                    liveUserFirstEntry.setCompanyId(companyId);
+//                    liveUserFirstEntry.setCompanyUserId(companyUserId);
+//                    liveUserFirstEntryService.updateLiveUserFirstEntry(liveUserFirstEntry);
+//                }
+//            } else {
+//                // 这个用户A邀请用户b,b的业绩算a的销售的
+//                if (companyUserId == -2L) {
+//                    LiveUserFirstEntry clientB = liveUserFirstEntryService.selectEntityByLiveIdUserId(liveId, companyUserId);
+//                    companyId = clientB.getCompanyId();
+//                    companyUserId = clientB.getCompanyUserId();
+//                }
+//                Date date = new Date();
+//                liveUserFirstEntry = new LiveUserFirstEntry();
+//                liveUserFirstEntry.setUserId(userId);
+//                liveUserFirstEntry.setLiveId(liveId);
+//                liveUserFirstEntry.setCompanyId(companyId);
+//                liveUserFirstEntry.setCompanyUserId(companyUserId);
+//                liveUserFirstEntry.setEntryDate(date);
+//                liveUserFirstEntry.setFirstEntryTime(date);
+//                liveUserFirstEntry.setUpdateTime( date);
+//                liveUserFirstEntryService.insertLiveUserFirstEntry(liveUserFirstEntry);
+//            }
+//
+//
+//        } else {
+//            adminRoom.add(session);
+//        }
+//
+//        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
+//    }
+//
+//    //关闭连接时调用
+//    @OnClose
+//    public void onClose(Session session) {
+//        Map<String, Object> userProperties = session.getUserProperties();
+//
+//        long liveId = (long) userProperties.get("liveId");
+//        long userId = (long) userProperties.get("userId");
+//        long userType = (long) userProperties.get("userType");
+//
+//        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+//        List<Session> adminRoom = getAdminRoom(liveId);
+//        if (userType == 0) {
+//            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+//            if (Objects.isNull(fsUser)) {
+//                throw new BaseException("用户信息错误");
+//            }
+//
+//            LiveWatchUser liveWatchUserVO = liveWatchUserService.close(liveId, userId);
+//            room.remove(userId);
+//
+//            if (room.isEmpty()) {
+//                rooms.remove(liveId);
+//            }
+//
+//
+//            // 直播间在线人数 -1
+//            redisCache.increment(ONLINE_USERS_KEY + liveId, -1);
+//            SendMsgVo sendMsgVo = new SendMsgVo();
+//            sendMsgVo.setLiveId(liveId);
+//            sendMsgVo.setUserId(userId);
+//            sendMsgVo.setUserType(userType);
+//            sendMsgVo.setCmd("out");
+//            sendMsgVo.setMsg("用户离开");
+//            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+//            sendMsgVo.setNickName(fsUser.getNickname());
+//            sendMsgVo.setAvatar(fsUser.getAvatar());
+//
+//            // 广播离开消息
+//            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+//        } else {
+//            adminRoom.remove(session);
+//        }
+//
+//        log.debug("离开webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
+//    }
+//
+//    //收到客户端信息
+//    @OnMessage
+//    public void onMessage(Session session,String message) throws IOException {
+//        Map<String, Object> userProperties = session.getUserProperties();
+//
+//        long liveId = (long) userProperties.get("liveId");
+//        long userType = (long) userProperties.get("userType");
+//
+//        SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
+//        if(msg.isOn()) return;
+//        try {
+//            switch (msg.getCmd()) {
+//                case "heartbeat":
+//                    sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
+//                    break;
+//                case "sendMsg":
+//                    msg.setMsg(productionWordFilter.filter(msg.getMsg()).getFilteredText());
+//                    if(StringUtils.isEmpty(msg.getMsg())) return;
+//                    LiveMsg liveMsg = new LiveMsg();
+//                    liveMsg.setLiveId(msg.getLiveId());
+//                    liveMsg.setUserId(msg.getUserId());
+//                    liveMsg.setNickName(msg.getNickName());
+//                    liveMsg.setAvatar(msg.getAvatar());
+//                    liveMsg.setMsg(msg.getMsg());
+//                    liveMsg.setCreateTime(new Date());
+//
+//                    if (userType == 0) {
+//                        LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
+//                        if(liveWatchUser.getMsgStatus() == 1){
+//                            sendMessage(session, JSONObject.toJSONString(R.error("你已被禁言")));
+//                            return;
+//                        }
+//
+//                        liveMsgService.insertLiveMsg(liveMsg);
+//                    }
+//
+//                    msg.setOn(true);
+//                    msg.setData(JSONObject.toJSONString(liveMsg));
+//
+//                    // 广播消息
+//                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+//                    break;
+//                case "sendGift":
+//                    break;
+//                case "blockUser":
+//                    sendBlockMessage(liveId, msg.getUserId());
+//                    break;
+//                case "goods":
+//                    sendGoodsMessage(msg);
+//                    break;
+//                case "red":
+//                    processRed(liveId, msg);
+//                    break;
+//                case "lottery":
+//                    processLottery(liveId, msg);
+//                    break;
+//                case "delAutoTask":
+//                    if (userType == 1) {
+//                        delAutoTask(liveId, DateUtils.parseDate(msg.getData(),"yyyy-MM-dd'T'HH:mm:ss.SSSZ").getTime());
+//                    }
+//                    break;
+//            }
+//        } catch (Exception e) {
+//            log.error("webSocket 消息处理失败 msg: {}", e.getMessage(), e);
+//        }
+//    }
+//
+//
+//
+//    private void sendGoodsMessage(SendMsgVo msg) {
+//        JSONObject jsonObject = JSON.parseObject(msg.getData());
+//        Long goodsId = jsonObject.getLong("goodsId");
+//        Long liveId = jsonObject.getLong("liveId");
+//        Integer status = jsonObject.getInteger("status");
+//        msg.setStatus(status);
+//        LiveGoodsVo liveGoods = liveGoodsService.selectLiveGoodsVoByGoodsId(goodsId);
+//        if(liveGoods == null) return;
+//        msg.setLiveId(liveId);
+//        msg.setData(JSONObject.toJSONString(liveGoods));
+//        broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+//    }
+//
+//    /**
+//     * 处理红包变动消息
+//     */
+//    private void processRed(Long liveId, SendMsgVo msg) {
+//        log.debug("redData: {}", msg);
+//        JSONObject jsonObject = JSON.parseObject(msg.getData());
+//        Integer status = jsonObject.getInteger("status");
+//        msg.setStatus( status);
+//        LiveRedConf liveRedConf = liveRedConfService.selectLiveRedConfByRedId(jsonObject.getLong("redId"));
+//        if (Objects.nonNull(liveRedConf)) {
+//            msg.setData(JSONObject.toJSONString(liveRedConf));
+//            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+//        }
+//    }
+//
+//    /**
+//     * 处理抽奖变动消息
+//     */
+//    private void processLottery(Long liveId, SendMsgVo msg) {
+//        log.debug("lotteryData: {}", msg);
+//        JSONObject jsonObject = JSON.parseObject(msg.getData());
+//        Integer status = jsonObject.getInteger("status");
+//        msg.setStatus( status);
+//        LiveLotteryConf liveLotteryConf = liveLotteryConfService.selectLiveLotteryConfByLotteryId(jsonObject.getLong("lotteryId"));
+//        if (Objects.nonNull(liveLotteryConf)) {
+//            msg.setData(JSONObject.toJSONString(liveLotteryConf));
+//            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+//        }
+//    }
+//
+//    //错误时调用
+//    @OnError
+//    public void onError(Session session, Throwable throwable) {
+//        log.error("webSocKet连接错误 msg: {}", throwable.getMessage(), throwable);
+//    }
+//
+//    /**
+//     * 获取房间
+//     * @param liveId 直播间ID
+//     * @return 容器
+//     */
+//    private ConcurrentHashMap<Long, Session> getRoom(Long liveId) {
+//        return rooms.computeIfAbsent(liveId, k -> new ConcurrentHashMap<>());
+//    }
+//
+//    /**
+//     * 获取管理端房间
+//     * @param liveId  直播间ID
+//     * @return  容器
+//     */
+//    private List<Session> getAdminRoom(Long liveId) {
+//        return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
+//    }
+//
+//    //发送消息
+//    public void sendMessage(Session session, String message) throws IOException {
+//        session.getAsyncRemote().sendText(message);
+//    }
+//
+//    public void sendIntegralMessage(Long liveId, Long userId,Long scoreAmount) {
+//        SendMsgVo sendMsgVo = new SendMsgVo();
+//        sendMsgVo.setLiveId(liveId);
+//        sendMsgVo.setUserId(userId);
+//        sendMsgVo.setUserType(0L);
+//        sendMsgVo.setCmd("Integral");
+//        sendMsgVo.setMsg("恭喜你成功获得观看奖励:" + scoreAmount + "芳华币");
+//        sendMsgVo.setData(String.valueOf(scoreAmount));
+//        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+//        Session session = room.get(userId);
+//        if(Objects.isNull( session)) return;
+//        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+//    }
+//
+//    private void sendBlockMessage(Long liveId, Long userId) {
+//        SendMsgVo sendMsgVo = new SendMsgVo();
+//        sendMsgVo.setLiveId(liveId);
+//        sendMsgVo.setUserId(userId);
+//        sendMsgVo.setUserType(0L);
+//        sendMsgVo.setCmd("blockUser");
+//        sendMsgVo.setMsg("账号已被停用");
+//        sendMsgVo.setData(null);
+//        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+//        Session session = room.get(userId);
+//        if(Objects.isNull( session)) return;
+//        session.getAsyncRemote().sendText(JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+//    }
+//
+//    /**
+//     * 广播消息
+//     * @param liveId   直播间ID
+//     * @param message  消息内容
+//     */
+//    public void broadcastMessage(Long liveId, String message) {
+//        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+//        List<Session> adminRoom = getAdminRoom(liveId);
+//
+//        room.forEach((k, v) -> v.getAsyncRemote().sendText(message));
+//        adminRoom.forEach(v -> v.getAsyncRemote().sendText(message));
+//    }
+//
+//    /**
+//     *定期将缓存的数据写入数据库
+//     */
+//    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+//    public void syncLiveDataToDB() {
+//        List<LiveData> liveDatas = liveDataService.getAllLiveDatas(); // 获取所有正在直播的直播间数据
+//        if(liveDatas == null)
+//            return;
+//        liveDatas.forEach(liveData ->{
+//            liveData.setLikes(
+//                    Optional.ofNullable(redisCache.incrementCacheValue("live:like:" + liveData.getLiveId(),0 )).orElse(0L)
+//            );
+//
+//       /* for (Long liveId : liveIds) {
+//            LiveData liveData = liveDataService.selectLiveDataByLiveId(liveId);
+//            if (liveData == null) {
+//                continue; // 防止空指针异常
+//            }*/
+//
+//
+//            // 从 redis 获取数据,并提供默认值,避免 NPE
+//            liveData.setPageViews(
+//                    Optional.ofNullable(redisCache.incrementCacheValue(PAGE_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
+//            );
+//            liveData.setTotalViews(
+//                    Optional.ofNullable(redisCache.incrementCacheValue(TOTAL_VIEWS_KEY + liveData.getLiveId(),0)).orElse(0L)
+//            );
+//            liveData.setUniqueVisitors(
+//                    /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VISITORS_KEY + liveId))
+//                            .map(Set::size)  // 获取集合大小
+//                            .map(Long::valueOf)  // 转换为 Long 类型
+//                            .orElse(0L)*/
+//                    Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VISITORS_KEY + liveData.getLiveId(),0)).orElse(0L)
+//            );
+//            liveData.setUniqueViewers(
+//                    /*Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VIEWERS_KEY + liveId))
+//                            .map(Set::size)  // 获取集合大小
+//                            .map(Long::valueOf)  // 转换为 Long 类型
+//                            .orElse(0L)*/
+//                    Optional.ofNullable(redisCache.incrementCacheValue(UNIQUE_VIEWERS_KEY + liveData.getLiveId(),0)).orElse(0L)
+//            );
+//            liveData.setPeakConcurrentViewers(
+//                    Optional.ofNullable(redisCache.incrementCacheValue(MAX_ONLINE_USERS_KEY + liveData.getLiveId(),0)).orElse(0L)
+//            );
+//        });
+//        if(!liveDatas.isEmpty())
+//            for (LiveData liveData : liveDatas) {
+//                liveDataService.updateLiveData(liveData);
+//            }
+//
+//            /*// 更新数据库
+//            liveDataService.updateLiveData(liveData);*/
+//    }
+//
+//
+//    public void handleAutoTask(LiveAutoTask task) {
+//        if (task.getTaskType() == 1L) {
+//            SendMsgVo msg = new SendMsgVo();
+//            msg.setLiveId(task.getLiveId());
+//            msg.setData(task.getContent());
+//            msg.setCmd("goods");
+//            try {
+//                LiveGoodsVo liveGoodsVo = JSON.parseObject(task.getContent(), LiveGoodsVo.class);
+//                liveGoodsService.updateLiveIsShow(liveGoodsVo.getGoodsId(), task.getLiveId());
+//            } catch (Exception e) {
+//                log.error("定时任务执行异常:{}", e.getMessage());
+//            }
+//            msg.setStatus(1);
+//            broadcastMessage(task.getLiveId(), JSONObject.toJSONString(R.ok().put("data", msg)));
+//        }
+//    }
+//    private void delAutoTask(long liveId, Long data) {
+//        String key = "live:auto_task:";
+//        redisCache.redisTemplate.opsForZSet().removeRangeByScore(key + liveId, data, data);
+//    }
+//}