瀏覽代碼

直播代码移植以及websocket相关代码修改

chenguo 1 周之前
父節點
當前提交
ebcdc869dd

+ 175 - 50
fs-live-app/src/main/java/com/fs/app/controller/UserController.java

@@ -3,22 +3,18 @@ package com.fs.app.controller;
 
 import cn.hutool.core.date.DateTime;
 import com.fs.app.annotation.Login;
+import com.fs.app.param.FsUserLoginParam;
 import com.fs.app.param.LoginParam;
-import com.fs.app.vo.UserListVO;
-import com.fs.app.vo.UserVO;
-import com.fs.chat.service.IChatRoleService;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
-import com.fs.common.utils.PinYinUtil;
-import com.fs.common.utils.SecurityUtils;
 import com.fs.common.utils.StringUtils;
+import com.fs.common.utils.sign.Md5Utils;
 import com.fs.company.domain.CompanyUser;
-import com.fs.company.domain.CompanyWxAccount;
 import com.fs.company.service.ICompanyPostService;
 import com.fs.company.service.ICompanyUserService;
-import com.fs.company.service.ICompanyWxAccountService;
-import com.fs.company.vo.CompanyUserVO;
 import com.fs.his.domain.FsUser;
+import com.fs.his.mapper.FsUserMapper;
+import com.fs.his.service.IFsUserNewTaskService;
 import com.fs.his.service.IFsUserService;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -30,13 +26,18 @@ import me.chanjar.weixin.common.error.WxErrorException;
 import me.chanjar.weixin.mp.api.WxMpService;
 import me.chanjar.weixin.mp.api.WxMpUserService;
 import me.chanjar.weixin.mp.bean.result.WxMpUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+
+import static com.fs.his.utils.PhoneUtil.encryptPhone;
 
 
 @Api("用户接口")
@@ -50,58 +51,65 @@ public class UserController extends AppBaseController {
 	private final WxMpService wxMpService;
 	private final IFsUserService userService;
 
+	private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+	@Autowired
+	private FsUserMapper userMapper;
+	@Autowired
+	private IFsUserNewTaskService userNewTaskService;
+
 
 	@ApiOperation("课程短链公众号登录")
 	@PostMapping("/loginByMp")
 	public R loginByMp( @RequestBody LoginParam param) {
-//		if (org.apache.commons.lang3.StringUtils.isBlank(param.getCode())) {
-//			return R.error("code不存在");
-//		}
-//		try{
-//			WxOAuth2AccessToken wxMpOAuth2AccessToken = wxMpService.getOAuth2Service().getAccessToken(param.getCode());
-//			WxOAuth2UserInfo wxMpUser = wxMpService.getOAuth2Service().getUserInfo(wxMpOAuth2AccessToken, null);
-//			WxMpUserService wxMpUserService = wxMpService.getUserService();
-//			WxMpUser userInfo = wxMpUserService.userInfo(wxMpUser.getOpenid());
-//      if (!userInfo.getSubscribe()){
-//        return R.error("请关注公众号进行登录");
-//      }
+		if (StringUtils.isBlank(param.getCode())) {
+			return R.error("code不存在");
+		}
+		try{
+			WxOAuth2AccessToken wxMpOAuth2AccessToken = wxMpService.getOAuth2Service().getAccessToken(param.getCode());
+			WxOAuth2UserInfo wxMpUser = wxMpService.getOAuth2Service().getUserInfo(wxMpOAuth2AccessToken, null);
+			WxMpUserService wxMpUserService = wxMpService.getUserService();
+			WxMpUser userInfo = wxMpUserService.userInfo(wxMpUser.getOpenid());
+			if (!userInfo.getSubscribe()){
+				return R.error("请关注公众号进行登录");
+			}
 			FsUser user=userService.selectFsUserByUnionid(param.getUnionId());
-//			if(user!=null){
-//				FsUser userMap=new FsUser();
-//				userMap.setUserId(user.getUserId());
-//				userMap.setNickName(wxMpUser.getNickname());
-//				userMap.setAvatar(wxMpUser.getHeadImgUrl());
-//				userMap.setMpOpenId(wxMpUser.getOpenid());
-//				userMap.setUpdateTime(new DateTime());
-//				userService.updateFsUser(userMap);
-//			}
-//			else{
-//				//写入
-//				user=new FsUser();
-//				user.setNickName(wxMpUser.getNickname());
-//				user.setAvatar(wxMpUser.getHeadImgUrl());
-//				user.setStatus(1);
-//				user.setSex(wxMpUser.getSex());
-//				user.setMpOpenId(wxMpUser.getOpenid());
-//				user.setUnionId(wxMpUser.getUnionId());
-//				user.setCreateTime(new Date());
-//				userService.insertFsUser(user);
-//			}
+			if(user!=null){
+				FsUser userMap=new FsUser();
+				userMap.setUserId(user.getUserId());
+				userMap.setNickName(wxMpUser.getNickname());
+				userMap.setAvatar(wxMpUser.getHeadImgUrl());
+				userMap.setMpOpenId(wxMpUser.getOpenid());
+				userMap.setUpdateTime(new DateTime());
+				userService.updateFsUser(userMap);
+			}
+			else{
+				//写入
+				user=new FsUser();
+				user.setNickName(wxMpUser.getNickname());
+				user.setAvatar(wxMpUser.getHeadImgUrl());
+				user.setStatus(1);
+				user.setSex(wxMpUser.getSex());
+				user.setMpOpenId(wxMpUser.getOpenid());
+				user.setUnionId(wxMpUser.getUnionId());
+				user.setCreateTime(new Date());
+				userService.insertFsUser(user);
+			}
 			String token = jwtUtils.generateToken(user.getUserId());
 			redisCache.setCacheObject("token:"+user.getUserId(),token,604800, TimeUnit.SECONDS);
 			Map<String,Object> map=new HashMap<>();
 			map.put("token",token);
 			map.put("user",user);
 			return R.ok(map);
-//		}
-//		catch (WxErrorException e){
-//			if(e.getError().getErrorCode()==40163){
-//				return R.error(40163,e.getError().getErrorMsg());
-//			}
-//			else{
-//				return R.error("授权失败,"+e.getMessage());
-//			}
-//		}
+		}
+		catch (WxErrorException e){
+			if(e.getError().getErrorCode()==40163){
+				return R.error(40163,e.getError().getErrorMsg());
+			}
+			else{
+				return R.error("授权失败,"+e.getMessage());
+			}
+		}
 
 	}
 
@@ -165,4 +173,121 @@ public class UserController extends AppBaseController {
 		}
 	}
 
+
+	/**
+	 * app登录
+	 * */
+	@ApiOperation("登录")
+	@PostMapping("/loginByApp")
+	@Transactional
+	public R login(@Validated @RequestBody FsUserLoginParam param) {
+		int loginType = param.getLoginType();
+		switch (loginType) {
+			case 1:
+				return handleLoginType1(param);
+			case 3:
+				return handleLoginType3(param);
+			default:
+				return R.error("请选择正确的登陆类型!");
+		}
+	}
+
+	private R handleLoginType1(FsUserLoginParam param) {
+		if (org.apache.commons.lang3.StringUtils.isEmpty(param.getPhone()) || org.apache.commons.lang3.StringUtils.isEmpty(param.getPassword())) {
+			return R.error("账号或密码不能为空");
+		}
+
+		FsUser user = findUserByPhone(param.getPhone());
+
+		// 校验用户是否存在及账号状态
+		if (user == null) {
+			return R.error("账号不存在,请先注册账号");
+		} else if (user.getStatus() == 0) {
+			return R.error("账号已停用");
+		} else if (org.apache.commons.lang3.StringUtils.isEmpty(user.getPassword())) {
+			return R.error("账号不存在,请先注册账号");
+		}
+
+		if (!Md5Utils.hash(param.getPassword()).equals(user.getPassword())) {
+			return R.error("密码不正确");
+		}
+
+		return generateTokenAndReturn(user);
+
+	}
+
+	private R handleLoginType3(FsUserLoginParam param) {
+		if (org.apache.commons.lang3.StringUtils.isEmpty(param.getPhone())) {
+			return R.error("获取手机号失败");
+		}
+		// 根据手机号查询用户
+		FsUser user = findUserByPhone(param.getPhone());
+		if (user == null) {
+			createNewUser(param);
+			return R.ok().put("isNew",true).put("phone",encryptPhone(param.getPhone()));
+		} else {
+			if (user.getUnionId()==null){
+				if (user.getPhone().length()<=11){
+					FsUser fsUser = new FsUser();
+					fsUser.setUserId(user.getUserId());
+					fsUser.setPhone(encryptPhone(param.getPhone()));
+					userMapper.updateFsUser(fsUser);
+					logger.info("zyp \n【手机加密】:{}",encryptPhone(param.getPhone()));
+				}
+				return R.ok().put("isNew",true).put("phone",encryptPhone(param.getPhone()));
+			}
+			/*if (org.apache.commons.lang3.StringUtils.isNotEmpty(param.getJpushId())) {
+				updateExistingUserJpushId(user, param.getJpushId());
+			}*/
+		}
+		return generateTokenAndReturn(user);
+	}
+
+	private void updateExistingUserJpushId(FsUser user, String jpushId) {
+		FsUser userMap = new FsUser();
+		userMap.setUserId(user.getUserId());
+		userMap.setJpushId(jpushId);
+		userService.updateFsUser(userMap);
+	}
+
+	private R generateTokenAndReturn(FsUser user) {
+		String token = jwtUtils.generateToken(user.getUserId());
+		redisCache.setCacheObject("userToken:" + user.getUserId(), token, 604800, TimeUnit.SECONDS);
+		int isFirstLogin = userNewTaskService.performTaskOne(user.getUserId());
+		Map<String, Object> map = new HashMap<>();
+		map.put("token", token);
+		map.put("user", user);
+		map.put("isFirst",isFirstLogin);
+		return R.ok(map);
+	}
+
+	private FsUser findUserByPhone(String phone) {
+		// 先根据加密手机号查询用户
+		String jiami = (encryptPhone(phone));
+		FsUser user = userMapper.selectFsUserByPhoneLimitOne(jiami);
+
+		// 如果没有找到用户,再根据手机号查询
+		if (user == null) {
+			user = userMapper.selectFsUserByPhoneLimitOne(phone);
+
+		}
+		return user;
+	}
+
+	private FsUser createNewUser(FsUserLoginParam param) {
+		FsUser newUser = new FsUser();
+		newUser.setLoginDevice(param.getLoginDevice() != null ? param.getLoginDevice() : null);
+		newUser.setSource(param.getSource() != null ? param.getSource() : null );
+		newUser.setNickName("匿名用户**");
+		newUser.setPhone(param.getPhone());
+		newUser.setCreateTime(new Date());
+		newUser.setStatus(1);
+		newUser.setAvatar("https://cos.his.cdwjyyh.com/fs/20240926/420728ee06e54575ba82665dedb4756b.png");
+		/*if (org.apache.commons.lang3.StringUtils.isNotEmpty(param.getJpushId())) {
+			newUser.setJpushId(param.getJpushId());
+		}*/
+		userService.insertFsUser(newUser);
+		return newUser;
+	}
+
 }

+ 25 - 0
fs-live-app/src/main/java/com/fs/app/param/FsUserLoginParam.java

@@ -0,0 +1,25 @@
+package com.fs.app.param;
+
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+import java.io.Serializable;
+
+@Data
+public class FsUserLoginParam implements Serializable {
+    private String phone;
+
+    private String password;
+
+    @NotNull(message = "登录类型不能为空")
+    private Integer loginType;//登录类型 1密码登录  2微信登录  3手机号一键登录
+
+    private String code;
+
+    private String jpushId;
+
+    private String openId;
+
+    private String loginDevice;
+    private String source;
+}

+ 46 - 0
fs-live-app/src/main/java/com/fs/app/utils/VerifyUtils.java

@@ -0,0 +1,46 @@
+package com.fs.app.utils;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+
+public class VerifyUtils {
+
+    /**
+     * 验证签名
+     */
+    public static boolean verifySignature(String liveId, String userId, String userType, String secret, String signature) throws Exception {
+        String expected = generateSignature(liveId, userId, userType, secret);
+        return expected.equals(signature);
+    }
+
+    /**
+     * 生成签名:将 liveId、userId、userType 按顺序拼接后,用 HMAC-SHA256 算法生成签名
+     *
+     * @param liveId   直播间ID
+     * @param userId   用户ID
+     * @param userType 用户类型
+     * @param secret   共享密钥
+     * @return 十六进制格式的签名
+     * @throws Exception e
+     */
+    private static String generateSignature(String liveId, String userId, String userType, String secret) throws Exception {
+        String data = liveId + userId + userType + secret;
+        Mac mac = Mac.getInstance("HmacSHA256");
+        SecretKeySpec keySpec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
+        mac.init(keySpec);
+        byte[] rawHmac = mac.doFinal(data.getBytes(StandardCharsets.UTF_8));
+        return bytesToHex(rawHmac);
+    }
+
+    /**
+     * 将字节数组转换为十六进制字符串
+     */
+    private static String bytesToHex(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            sb.append(String.format("%02x", b));
+        }
+        return sb.toString();
+    }
+}

+ 97 - 0
fs-live-app/src/main/java/com/fs/app/websocket/auth/AuthHandler.java

@@ -0,0 +1,97 @@
+package com.fs.app.websocket.auth;
+
+import com.fs.app.utils.JwtUtils;
+import com.fs.app.utils.VerifyUtils;
+import com.fs.app.websocket.constant.AttrConstant;
+import com.fs.common.utils.spring.SpringUtils;
+import io.jsonwebtoken.Claims;
+import io.netty.channel.*;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class AuthHandler extends ChannelInboundHandlerAdapter {
+
+    private final JwtUtils jwtUtils = SpringUtils.getBean(JwtUtils.class);
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof FullHttpRequest) {
+            FullHttpRequest req = (FullHttpRequest) msg;
+            String uri = req.uri();
+            QueryStringDecoder decoder = new QueryStringDecoder(uri);
+            Map<String, List<String>> parameterMap = decoder.parameters();
+            if (!parameterMap.containsKey(AttrConstant.LIVE_ID)) {
+                ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+
+            if (!parameterMap.containsKey(AttrConstant.USER_ID)) {
+                ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+
+            String tokenKey = jwtUtils.getHeader();
+            if (!parameterMap.containsKey(tokenKey) && !parameterMap.containsKey(AttrConstant.SIGNATURE)) {
+                ctx.writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+
+            Long liveId = Long.valueOf(parameterMap.get(AttrConstant.LIVE_ID).get(0));
+            Long userId = Long.valueOf(parameterMap.get(AttrConstant.USER_ID).get(0));
+
+            // 验证 token
+            if (parameterMap.containsKey(tokenKey)) {
+            String token = parameterMap.get(tokenKey).get(0);
+            Claims claims = jwtUtils.getClaimByToken(token);
+            if (claims == null || jwtUtils.isTokenExpired(claims.getExpiration())) {
+                ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+                // 将 userType 设置为 0(或根据实际业务逻辑设置)
+                ctx.channel().attr(AttrConstant.ATTR_USER_TYPE).set(0L);
+            }
+
+            // 验证签名
+            if (parameterMap.containsKey(AttrConstant.SIGNATURE)) {
+                String userTypeStr = parameterMap.get(AttrConstant.USER_TYPE).get(0);
+                String timestampStr = parameterMap.get(AttrConstant.TIMESTAMP).get(0);
+                String signatureStr = parameterMap.get(AttrConstant.SIGNATURE).get(0);
+
+                try {
+                    if (!VerifyUtils.verifySignature(liveId.toString(), userId.toString(), userTypeStr, timestampStr, signatureStr)) {
+                        ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                        return;
+                    }
+                    ctx.channel().attr(AttrConstant.ATTR_USER_TYPE).set(Long.parseLong(userTypeStr));
+                } catch (Exception e) {
+                    ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: invalid parameters")).addListener(ChannelFutureListener.CLOSE);
+                    return;
+                }
+            }
+
+            // 将 liveId 和 userId 保存到 Channel 属性中,供后续处理使用
+            ctx.channel().attr(AttrConstant.ATTR_LIVE_ID).set(liveId);
+            ctx.channel().attr(AttrConstant.ATTR_USER_ID).set(userId);
+
+            // 继续处理 WebSocket 握手
+            ctx.pipeline().remove(this);
+            ctx.fireChannelRead(req.retain());
+        } else {
+            ctx.channel().close();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.warn("webSocket 认证异常: {}", cause.getMessage(), cause);
+        ctx.close();
+    }
+}

+ 85 - 0
fs-live-app/src/main/java/com/fs/app/websocket/auth/WebSocketConfigurator.java

@@ -0,0 +1,85 @@
+package com.fs.app.websocket.auth;
+
+import com.fs.app.utils.JwtUtils;
+import com.fs.app.utils.VerifyUtils;
+import com.fs.app.websocket.constant.AttrConstant;
+import com.fs.common.exception.base.BaseException;
+import com.fs.common.utils.spring.SpringUtils;
+import io.jsonwebtoken.Claims;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.websocket.HandshakeResponse;
+import javax.websocket.server.HandshakeRequest;
+import javax.websocket.server.ServerEndpointConfig;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class WebSocketConfigurator extends ServerEndpointConfig.Configurator {
+
+    private final JwtUtils jwtUtils = SpringUtils.getBean(JwtUtils.class);
+
+    /**
+     * 安全校验
+     * @param sec       配置
+     * @param request   请求
+     * @param response  返回
+     */
+    @Override
+    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
+        Map<String, List<String>> parameterMap = request.getParameterMap();
+        if(!parameterMap.containsKey(AttrConstant.LIVE_ID)){
+            throw new BaseException("缺少必要的参数");
+        }
+        if(!parameterMap.containsKey(AttrConstant.USER_ID)){
+            throw new BaseException("缺少必要的参数");
+        }
+
+        String tokenKey = jwtUtils.getHeader();
+        if (!parameterMap.containsKey(tokenKey) && !parameterMap.containsKey(AttrConstant.SIGNATURE)) {
+            throw new BaseException("缺少必要的参数");
+        }
+
+        Long liveId = Long.valueOf(parameterMap.get(AttrConstant.LIVE_ID).get(0));
+        Long userId = Long.valueOf(parameterMap.get(AttrConstant.USER_ID).get(0));
+
+        Map<String, Object> userProperties = sec.getUserProperties();
+        userProperties.put(AttrConstant.LIVE_ID, liveId);
+        userProperties.put(AttrConstant.USER_ID, userId);
+
+        // 验证token
+        if (parameterMap.containsKey(tokenKey)) {
+            String token = parameterMap.get(tokenKey).get(0);
+            Claims claims = jwtUtils.getClaimByToken(token);
+            if(claims == null || jwtUtils.isTokenExpired(claims.getExpiration())){
+                throw new BaseException(jwtUtils.getHeader());
+            }
+
+            userProperties.put(AttrConstant.USER_TYPE, 0L);
+        }
+
+        // 验证签名
+        if (parameterMap.containsKey(AttrConstant.SIGNATURE)) {
+            String liveIdStr = parameterMap.get(AttrConstant.LIVE_ID).get(0);
+            String userIdStr = parameterMap.get(AttrConstant.USER_ID).get(0);
+            String userTypeStr = parameterMap.get(AttrConstant.USER_TYPE).get(0);
+            String timestampStr = parameterMap.get(AttrConstant.TIMESTAMP).get(0);
+            String signatureStr = parameterMap.get(AttrConstant.SIGNATURE).get(0);
+
+            try {
+                if (!VerifyUtils.verifySignature(liveIdStr, userIdStr, userTypeStr, timestampStr, signatureStr)) {
+                    throw new BaseException("缺少必要的参数");
+                }
+
+                userProperties.put(AttrConstant.USER_TYPE, Long.parseLong(userTypeStr));
+            } catch (Exception e) {
+                log.warn("webSocket连接验签失败 msg: {}", e.getMessage(), e);
+                throw new BaseException("缺少必要的参数");
+            }
+        }
+    }
+
+}

+ 6 - 1
fs-live-app/src/main/java/com/fs/app/websocket/bean/SendMsgVo.java

@@ -15,17 +15,22 @@ public class SendMsgVo {
     private Long liveId;
     @ApiModelProperty("用户ID")
     private Long userId;
+    @ApiModelProperty("企业ID")
+    private Long companyId;
+    @ApiModelProperty("企业用户ID")
+    private Long companyUserId;
     @ApiModelProperty("用户类型0用户1管理员")
     private Long userType;
     @ApiModelProperty("消息代码")
     private String cmd;
     @ApiModelProperty("发送消息")
     private String msg;
+    @ApiModelProperty("消息")
+    private String data;
     @ApiModelProperty("名称")
     private String nickName;
     @ApiModelProperty("头像")
     private String avatar;
     private boolean on = false;
 
-
 }

+ 18 - 0
fs-live-app/src/main/java/com/fs/app/websocket/constant/AttrConstant.java

@@ -0,0 +1,18 @@
+package com.fs.app.websocket.constant;
+
+import io.netty.util.AttributeKey;
+
+public class AttrConstant {
+
+    // 定义变量名
+    public static final String LIVE_ID = "liveId";
+    public static final String USER_ID = "userId";
+    public static final String USER_TYPE = "userType";
+    public static final String TIMESTAMP = "timestamp";
+    public static final String SIGNATURE = "signature";
+
+    // 定义 AttributeKey 保存必要参数
+    public static final AttributeKey<Long> ATTR_LIVE_ID = AttributeKey.valueOf(LIVE_ID);
+    public static final AttributeKey<Long> ATTR_USER_ID = AttributeKey.valueOf(USER_ID);
+    public static final AttributeKey<Long> ATTR_USER_TYPE = AttributeKey.valueOf(USER_TYPE);
+}

+ 268 - 0
fs-live-app/src/main/java/com/fs/app/websocket/handle/LiveChatHandler.java

@@ -0,0 +1,268 @@
+package com.fs.app.websocket.handle;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fs.app.websocket.bean.SendMsgVo;
+import com.fs.app.websocket.constant.AttrConstant;
+import com.fs.common.core.domain.R;
+import com.fs.common.utils.spring.SpringUtils;
+import com.fs.his.domain.FsUser;
+import com.fs.his.service.IFsUserService;
+import com.fs.live.domain.LiveMsg;
+import com.fs.live.domain.LiveWatchUser;
+import com.fs.live.service.ILiveMsgService;
+import com.fs.live.service.ILiveService;
+import com.fs.live.service.ILiveWatchUserService;
+import com.fs.live.vo.LiveWatchUserVO;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+@Component
+@ChannelHandler.Sharable
+@Slf4j
+public class LiveChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    // 容器
+    private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Channel>> adminRooms = new ConcurrentHashMap<>();
+    private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Channel>> rooms = new ConcurrentHashMap<>();
+    private final static ConcurrentHashMap<Long, ChannelGroup> roomGroups = new ConcurrentHashMap<>();
+    private final static ILiveService liveService = SpringUtils.getBean(ILiveService.class);
+    private final static ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
+    private final static ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
+    private final static IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
+
+    /**
+     * 处理握手
+     * @param ctx   连接
+     * @param evt   数据
+     * @throws Exception    异常
+     */
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        log.debug("事件");
+        // 处理 WebSocket 握手完成事件
+        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
+            Long userId = ctx.channel().attr(AttrConstant.ATTR_USER_ID).get();
+            Long liveId = ctx.channel().attr(AttrConstant.ATTR_LIVE_ID).get();
+            Long userType = ctx.channel().attr(AttrConstant.ATTR_USER_TYPE).get();
+
+            if (Objects.isNull(liveService.getById(liveId))) {
+                ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: 未找到直播间")).addListener(ChannelFutureListener.CLOSE);
+                return;
+            }
+
+            Map<Long, Channel> room = getRoom(liveId);
+            List<Channel> adminRoom = getAdminRoom(liveId);
+            ChannelGroup roomGroup = getRoomGroup(liveId);
+            roomGroup.add(ctx.channel());
+
+            if (userType == 0) {
+                // 加入房间
+                liveWatchUserService.join(liveId, userId);
+                room.put(userId, ctx.channel());
+
+                FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+                if (Objects.isNull(fsUser)) {
+                    ctx.channel().writeAndFlush(new TextWebSocketFrame("Error: 用户信息错误")).addListener(ChannelFutureListener.CLOSE);
+                    return;
+                }
+
+                LiveWatchUserVO liveWatchUserVO = liveWatchUserService.selectWatchUserByLiveIdAndUserId(liveId, userId);
+
+                SendMsgVo sendMsgVo = new SendMsgVo();
+                sendMsgVo.setLiveId(liveId);
+                sendMsgVo.setUserId(userId);
+                sendMsgVo.setUserType(userType);
+                sendMsgVo.setCmd("entry");
+                sendMsgVo.setMsg("用户进入");
+                sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+                sendMsgVo.setNickName(fsUser.getNickName());
+                sendMsgVo.setAvatar(fsUser.getAvatar());
+
+                // 广播连接消息
+                broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+            } else if (userType == 1) {
+                adminRoom.add(ctx.channel());
+            }
+
+            log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
+        }
+    }
+
+    /**
+     * 获取房间
+     * @param liveId 直播间ID
+     * @return 容器
+     */
+    private CopyOnWriteArrayList<Channel> getAdminRoom(Long liveId) {
+        return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
+    }
+
+    /**
+     * 获取房间
+     * @param liveId 直播间ID
+     * @return 容器
+     */
+    private ConcurrentHashMap<Long, Channel> getRoom(Long liveId) {
+        return rooms.computeIfAbsent(liveId, k -> new ConcurrentHashMap<>());
+    }
+
+    /**
+     * 获取房间用户组
+     * @param liveId 直播间ID
+     * @return  用户组
+     */
+    private ChannelGroup getRoomGroup(Long liveId) {
+        return roomGroups.computeIfAbsent(liveId, k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+    }
+
+    /**
+     * 发送广播
+     * @param liveId    直播间ID
+     * @param msg       消息
+     */
+    private void broadcastMessage(Long liveId, String msg) {
+        getRoomGroup(liveId).writeAndFlush(new TextWebSocketFrame(msg));
+    }
+
+    /**
+     * 发送指定消息
+     * @param channel   连接
+     * @param message   消息
+     */
+    private void sendMessage(Channel channel, String message) {
+        channel.writeAndFlush(new TextWebSocketFrame(message));
+    }
+
+    /**
+     * 接收消息
+     * @param channelHandlerContext 连接
+     * @param textWebSocketFrame    消息
+     * @throws Exception    异常
+     */
+    @Override
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
+        log.debug("接收到消息 data: {}", textWebSocketFrame.text());
+        Long liveId = channelHandlerContext.channel().attr(AttrConstant.ATTR_LIVE_ID).get();
+        Long userType = channelHandlerContext.channel().attr(AttrConstant.ATTR_USER_TYPE).get();
+
+        SendMsgVo msg = JSONObject.parseObject( textWebSocketFrame.text(), SendMsgVo.class);
+        if(msg.isOn()) return;
+        try {
+            switch (msg.getCmd()) {
+                case "heartbeat":
+                    sendMessage(channelHandlerContext.channel(), JSONObject.toJSONString(R.ok().put("data", msg)));
+                    break;
+                case "sendMsg":
+                    LiveMsg liveMsg = new LiveMsg();
+                    liveMsg.setLiveId(msg.getLiveId());
+                    liveMsg.setUserId(msg.getUserId());
+                    liveMsg.setNickName(msg.getNickName());
+                    liveMsg.setAvatar(msg.getAvatar());
+                    liveMsg.setMsg(msg.getMsg());
+                    liveMsg.setCreateTime(new Date());
+
+                    if (userType == 0) {
+                        LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
+                        if(liveWatchUser.getMsgStatus() == 1){
+                            sendMessage(channelHandlerContext.channel(), JSONObject.toJSONString(R.error("你以被禁言")));
+                            return;
+                        }
+
+                        liveMsgService.save(liveMsg);
+                    }
+
+                    msg.setOn(true);
+                    msg.setData(JSONObject.toJSONString(liveMsg));
+
+                    // 广播消息
+                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    break;
+            }
+        } catch (Exception e) {
+            log.error("webSocket 消息处理失败 msg: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 断开连接
+     * @param ctx   连接
+     * @throws Exception    异常
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.debug("断开连接");
+        Long userId = ctx.channel().attr(AttrConstant.ATTR_USER_ID).get();
+        Long liveId = ctx.channel().attr(AttrConstant.ATTR_LIVE_ID).get();
+        Long userType = ctx.channel().attr(AttrConstant.ATTR_USER_TYPE).get();
+
+        if (Objects.isNull(userId) || Objects.isNull(liveId) || Objects.isNull(userType)) {
+            return;
+        }
+
+        Map<Long, Channel> room = getRoom(liveId);
+        List<Channel> adminRoom = getAdminRoom(liveId);
+        ChannelGroup roomGroup = getRoomGroup(liveId);
+
+        if (userType == 0) {
+            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+            liveWatchUserService.close(liveId, userId);
+            room.remove(userId);
+
+            if (room.isEmpty()) {
+                rooms.remove(liveId);
+            }
+
+            LiveWatchUserVO liveWatchUserVO = liveWatchUserService.selectWatchUserByLiveIdAndUserId(liveId, userId);
+
+            SendMsgVo sendMsgVo = new SendMsgVo();
+            sendMsgVo.setLiveId(liveId);
+            sendMsgVo.setUserId(userId);
+            sendMsgVo.setUserType(userType);
+            sendMsgVo.setCmd("out");
+            sendMsgVo.setMsg("用户离开");
+            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+            sendMsgVo.setNickName(fsUser.getNickName());
+            sendMsgVo.setAvatar(fsUser.getAvatar());
+
+            // 广播离开消息
+            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        } else {
+            adminRoom.remove(ctx.channel());
+            if (adminRoom.isEmpty()) {
+                adminRooms.remove(liveId);
+            }
+        }
+        roomGroup.remove(ctx.channel());
+        if (roomGroup.isEmpty()) {
+            roomGroups.remove(liveId);
+        }
+
+        log.debug("断开webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
+
+    }
+
+    /**
+     * 连接异常
+     * @param ctx   连接
+     * @param cause 原因
+     * @throws Exception 异常
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.error("连接异常 msg: {}", cause.getMessage(), cause);
+        ctx.close();
+    }
+}

+ 97 - 0
fs-live-app/src/main/java/com/fs/app/websocket/service/NettyServerRunner.java

@@ -0,0 +1,97 @@
+package com.fs.app.websocket.service;
+
+import com.fs.app.websocket.auth.AuthHandler;
+import com.fs.app.websocket.handle.LiveChatHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.Objects;
+
+@Slf4j
+@Component
+public class NettyServerRunner implements CommandLineRunner {
+
+    private static final int port = 17114;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel serverChannel;
+    @Autowired
+    private AuthHandler authHandler;
+    @Autowired
+    private LiveChatHandler liveChatHandler;
+
+    @Override
+    public void run(String... args) throws Exception {
+        new Thread(this::startServer).start();
+    }
+
+    private void startServer() {
+        bossGroup = new NioEventLoopGroup(); // 处理连接
+        workerGroup = new NioEventLoopGroup(); // 处理I/O
+        try {
+            ServerBootstrap bootstrap = new ServerBootstrap();
+            bootstrap.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+
+                        @Override
+                        protected void initChannel(SocketChannel socketChannel) throws Exception {
+                            ChannelPipeline pipeline = socketChannel.pipeline();
+                            // 编解码
+                            pipeline.addLast(new HttpServerCodec());
+                            // 集合消息
+                            pipeline.addLast(new HttpObjectAggregator(65536));
+                            // 安全校验
+                            pipeline.addLast(authHandler);
+                            // websocket握手
+                            pipeline.addLast(new WebSocketServerProtocolHandler("/app/webSocket", null, true, 65536, false, true));
+                            // 自定义聊天
+                            pipeline.addLast(liveChatHandler);
+                        }
+                    })
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+            ChannelFuture future = bootstrap.bind(port).sync();
+            serverChannel = future.channel();
+            log.info("netty server started [{}]", port);
+            serverChannel.closeFuture().sync();
+        } catch (Exception e) {
+            log.error("netty server error msg: {}", e.getMessage(), e);
+        } finally {
+            shutdown();
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        shutdown();
+        log.info("netty server destroy");
+    }
+
+    private void shutdown() {
+        if (Objects.nonNull(bossGroup)) {
+            bossGroup.shutdownGracefully();
+        }
+
+        if (Objects.nonNull(workerGroup)) {
+            workerGroup.shutdownGracefully();
+        }
+
+        if (Objects.nonNull(serverChannel)) {
+            serverChannel.close();
+        }
+        log.info("netty server stopped");
+    }
+}

+ 233 - 64
fs-live-app/src/main/java/com/fs/app/websocket/service/WebSocketServer.java

@@ -2,6 +2,7 @@ package com.fs.app.websocket.service;
 
 
 import com.alibaba.fastjson.JSONObject;
+import com.fs.app.websocket.auth.WebSocketConfigurator;
 import com.fs.app.websocket.bean.SendMsgVo;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
@@ -10,105 +11,188 @@ import com.fs.common.utils.StringUtils;
 import com.fs.common.utils.spring.SpringUtils;
 import com.fs.his.domain.FsUser;
 import com.fs.his.service.IFsUserService;
+import com.fs.live.domain.LiveData;
 import com.fs.live.domain.LiveMsg;
 import com.fs.live.domain.LiveWatchUser;
+import com.fs.live.service.ILiveDataService;
 import com.fs.live.service.ILiveMsgService;
 import com.fs.live.service.ILiveService;
 import com.fs.live.service.ILiveWatchUserService;
+import com.fs.live.vo.LiveWatchUserVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
-@ServerEndpoint("/app/webSocket")
+@ServerEndpoint(value = "/app/webSocket",configurator = WebSocketConfigurator.class)
 @Component
+@Slf4j
 public class WebSocketServer {
 
-
-    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
-    private static ConcurrentHashMap<Long, Session> sessionPools = new ConcurrentHashMap<>();
+    // 直播间用户session
+    private final static ConcurrentHashMap<Long, ConcurrentHashMap<Long, Session>> rooms = new ConcurrentHashMap<>();
+    // 管理端连接
+    private final static ConcurrentHashMap<Long, CopyOnWriteArrayList<Session>> adminRooms = new ConcurrentHashMap<>();
     private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class);
     private final ILiveMsgService liveMsgService = SpringUtils.getBean(ILiveMsgService.class);
     private final ILiveService liveService = SpringUtils.getBean(ILiveService.class);
     private final ILiveWatchUserService liveWatchUserService = SpringUtils.getBean(ILiveWatchUserService.class);
     private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
+    private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
+    // 直播间在线用户缓存
+    //private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
 
-    //发送消息
-    public void sendMessage(Session session, String message) throws IOException {
-        if (session != null) {
-            synchronized (session) {
-                System.out.println("发送数据:" + message);
-                session.getBasicRemote().sendText(message);
-            }
+    private static final String USER_VISIT_KEY = "live:user:visit:";  // 用户访问标识用于判断是否是首次访问
+    private static final String UNIQUE_VISITORS_KEY = "live:unique:visitors:";  //访客数
+    private static final String UNIQUE_VIEWERS_KEY = "live:unique:viewers:";  //累计观看人数
+    private static final String PAGE_VIEWS_KEY = "live:page:views:";  //浏览量
+    private static final String TOTAL_VIEWS_KEY = "live:total:views:";  //累计观看人次
+    private static final String MAX_ONLINE_USERS_KEY = "live:max:online:"; //最大在线人数
+    private static final String ONLINE_USERS_KEY = "live:online:users:";  //当前在线人数
+    //建立连接成功调用
+    @OnOpen
+    public void onOpen(Session session) {
+
+        Map<String, Object> userProperties = session.getUserProperties();
+        long liveId = (long) userProperties.get("liveId");
+        long userId = (long) userProperties.get("userId");
+        long userType = (long) userProperties.get("userType");
+
+        if (liveService.getById(liveId) == null) {
+            throw new BaseException("未找到直播间");
         }
-    }
 
-    //给指定用户发送信息
-    public void sendInfo(String id, String message) {
-        Session session = sessionPools.get(id);
-        try {
-            if (session != null) {
+        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+        List<Session> adminRoom = getAdminRoom(liveId);
 
-                sendMessage(session, message);
+        // 记录连接信息 管理员不记录
+        if (userType == 0) {
+            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+            if (Objects.isNull(fsUser)) {
+                throw new BaseException("用户信息错误");
             }
 
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
+            liveWatchUserService.join(liveId, userId);
+            room.put(userId, session);
+            // 直播间浏览量 +1
+            redisCache.increment(PAGE_VIEWS_KEY + liveId, 1);
 
-    //建立连接成功调用
-    @OnOpen
-    public void onOpen(Session session) {
-        Map<String, String> params = getParams(session);
-        if(!params.containsKey("liveId")) throw new BaseException("未找到直播间");
-        if(!params.containsKey("userId")) throw new BaseException("用户信息错误");
-        long liveId = Long.parseLong(params.get("liveId"));
-        long userId = Long.parseLong(params.get("userId"));
-        liveWatchUserService.join(liveId, userId);
-        if (liveService.getById(liveId) == null) throw new BaseException("未找到直播间");
-        sessionPools.put(liveId, session);
-        System.out.println(liveId + "加入webSocket!当前人数为" + sessionPools.size());
+            // 累计观看人次 +1
+            redisCache.increment(TOTAL_VIEWS_KEY + liveId, 1);
 
+            // 记录在线人数
+            redisCache.increment(ONLINE_USERS_KEY + liveId, 1);
+            Integer currentOnline = redisCache.getCacheObject(ONLINE_USERS_KEY + liveId);
+            //最大同时在线人数
+            Integer maxOnline = redisCache.getCacheObject(MAX_ONLINE_USERS_KEY + liveId);
+            if (maxOnline == null || currentOnline > maxOnline) {
+                redisCache.setCacheObject(MAX_ONLINE_USERS_KEY + liveId, currentOnline);
+            }
+
+            // 判断是否是该直播间的首次访客(独立访客统计)
+            boolean isFirstVisit = redisCache.setIfAbsent(USER_VISIT_KEY + userId, 1, 1, TimeUnit.DAYS);
+            if (isFirstVisit) {
+
+                redisCache.increment(UNIQUE_VISITORS_KEY + liveId, 1);
+            }
+
+            // 判断是否是首次进入直播间的观众
+            boolean isFirstViewer = redisCache.setIfAbsent(UNIQUE_VIEWERS_KEY + liveId + ":" + userId, 1, 1, TimeUnit.DAYS);
+            if (isFirstViewer) {
+                redisCache.increment(UNIQUE_VIEWERS_KEY + liveId, 1);
+            }
+
+            LiveWatchUserVO liveWatchUserVO = liveWatchUserService.selectWatchUserByLiveIdAndUserId(liveId, userId);
+
+            SendMsgVo sendMsgVo = new SendMsgVo();
+            sendMsgVo.setLiveId(liveId);
+            sendMsgVo.setUserId(userId);
+            sendMsgVo.setUserType(userType);
+            sendMsgVo.setCmd("entry");
+            sendMsgVo.setMsg("用户进入");
+            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+            sendMsgVo.setNickName(fsUser.getNickName());
+            sendMsgVo.setAvatar(fsUser.getAvatar());
+
+            // 广播连接消息
+            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        } else {
+            adminRoom.add(session);
+        }
+
+        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
     }
 
     //关闭连接时调用
     @OnClose
     public void onClose(Session session) {
-        Map<String, String> params = getParams(session);
-        long liveId = Long.parseLong(params.get("liveId"));
-        long userId = Long.parseLong(params.get("userId"));
-        sessionPools.remove(liveId);
-        liveWatchUserService.close(liveId, userId);
-        System.out.println(liveId + "断开webSocket连接!当前人数为" + sessionPools.size());
+        Map<String, Object> userProperties = session.getUserProperties();
+
+        long liveId = (long) userProperties.get("liveId");
+        long userId = (long) userProperties.get("userId");
+        long userType = (long) userProperties.get("userType");
+
+        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+        List<Session> adminRoom = getAdminRoom(liveId);
+        if (userType == 0) {
+            FsUser fsUser = fsUserService.selectFsUserByUserId(userId);
+            if (Objects.isNull(fsUser)) {
+                throw new BaseException("用户信息错误");
+            }
+
+            liveWatchUserService.close(liveId, userId);
+            room.remove(userId);
+
+            if (room.isEmpty()) {
+                rooms.remove(liveId);
+            }
+
+            LiveWatchUserVO liveWatchUserVO = liveWatchUserService.selectWatchUserByLiveIdAndUserId(liveId, userId);
+
+            // 直播间在线人数 -1
+            redisCache.increment(ONLINE_USERS_KEY + liveId, -1);
+            SendMsgVo sendMsgVo = new SendMsgVo();
+            sendMsgVo.setLiveId(liveId);
+            sendMsgVo.setUserId(userId);
+            sendMsgVo.setUserType(userType);
+            sendMsgVo.setCmd("out");
+            sendMsgVo.setMsg("用户离开");
+            sendMsgVo.setData(JSONObject.toJSONString(liveWatchUserVO));
+            sendMsgVo.setNickName(fsUser.getNickName());
+            sendMsgVo.setAvatar(fsUser.getAvatar());
+
+            // 广播离开消息
+            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", sendMsgVo)));
+        } else {
+            adminRoom.remove(session);
+        }
+
+        log.debug("断开webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
     }
 
     //收到客户端信息
     @OnMessage
-    public void onMessage(String message) throws IOException {
+    public void onMessage(Session session,String message) throws IOException {
+        Map<String, Object> userProperties = session.getUserProperties();
+
+        long liveId = (long) userProperties.get("liveId");
+        long userType = (long) userProperties.get("userType");
+
         SendMsgVo msg = JSONObject.parseObject(message, SendMsgVo.class);
         if(msg.isOn()) return;
-        Session session;
-        System.out.println("收到数据" + msg.getCmd());
         try {
             switch (msg.getCmd()) {
                 case "heartbeat":
-                    session = sessionPools.get(msg.getUserId());
-                    sendMessage(session, JSONObject.toJSONString(msg));
+                    sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
                     break;
                 case "sendMsg":
-                    session = sessionPools.get(msg.getLiveId());
-                    if (session == null) return;
-                    LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
-                    if(liveWatchUser.getMsgStatus() == 1){
-                        sendMessage(session, JSONObject.toJSONString(R.error("你以被禁言")));
-                        return;
-                    }
                     LiveMsg liveMsg = new LiveMsg();
                     liveMsg.setLiveId(msg.getLiveId());
                     liveMsg.setUserId(msg.getUserId());
@@ -116,26 +200,111 @@ public class WebSocketServer {
                     liveMsg.setAvatar(msg.getAvatar());
                     liveMsg.setMsg(msg.getMsg());
                     liveMsg.setCreateTime(new Date());
-                    liveMsgService.save(liveMsg);
+
+                    if (userType == 0) {
+                        LiveWatchUser liveWatchUser = liveWatchUserService.getByLiveIdAndUserId(msg.getLiveId(), msg.getUserId());
+                        if(liveWatchUser.getMsgStatus() == 1){
+                            sendMessage(session, JSONObject.toJSONString(R.error("你以被禁言")));
+                            return;
+                        }
+
+                        liveMsgService.save(liveMsg);
+                    }
+
                     msg.setOn(true);
-                    sendMessage(session, JSONObject.toJSONString(R.ok().put("data", msg)));
-                    break;
+                    msg.setData(JSONObject.toJSONString(liveMsg));
 
+                    // 广播消息
+                    broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+                    break;
             }
         } catch (Exception e) {
-            System.out.println("收到数据" + e.getMessage());
+            log.error("webSocket 消息处理失败 msg: {}", e.getMessage(), e);
         }
-
     }
 
     //错误时调用
     @OnError
     public void onError(Session session, Throwable throwable) {
-        System.out.println("发生错误" + throwable.getMessage());
-        throwable.printStackTrace();
+        log.error("webSocKet连接错误 msg: {}", throwable.getMessage(), throwable);
+    }
+
+    /**
+     * 获取房间
+     * @param liveId 直播间ID
+     * @return 容器
+     */
+    private ConcurrentHashMap<Long, Session> getRoom(Long liveId) {
+        return rooms.computeIfAbsent(liveId, k -> new ConcurrentHashMap<>());
+    }
+
+    /**
+     * 获取管理端房间
+     * @param liveId  直播间ID
+     * @return  容器
+     */
+    private List<Session> getAdminRoom(Long liveId) {
+        return adminRooms.computeIfAbsent(liveId, k -> new CopyOnWriteArrayList<>());
     }
 
-    private Map<String, String> getParams(Session session){
-        return session.getRequestParameterMap().entrySet().stream().filter(e -> e.getValue() != null && !e.getValue().isEmpty() && StringUtils.isNotEmpty(e.getValue().get(0))).collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get(0)));
+    //发送消息
+    public void sendMessage(Session session, String message) throws IOException {
+        session.getAsyncRemote().sendText(message);
     }
+
+    /**
+     * 广播消息
+     * @param liveId   直播间ID
+     * @param message  消息内容
+     */
+    public void broadcastMessage(Long liveId, String message) {
+        ConcurrentHashMap<Long, Session> room = getRoom(liveId);
+        List<Session> adminRoom = getAdminRoom(liveId);
+
+        room.forEach((k, v) -> v.getAsyncRemote().sendText(message));
+        adminRoom.forEach(v -> v.getAsyncRemote().sendText(message));
+    }
+
+    /**
+     *定期将缓存的数据写入数据库
+     */
+    @Scheduled(fixedRate = 60000) // 每分钟执行一次
+    public void syncLiveDataToDB() {
+        List<Long> liveIds = liveDataService.getAllLiveIds(); // 获取所有正在直播的直播间ID
+        for (Long liveId : liveIds) {
+            LiveData liveData = liveDataService.selectLiveDataByLiveId(liveId);
+            if (liveData == null) {
+                continue; // 防止空指针异常
+            }
+
+
+            // 从 redis 获取数据,并提供默认值,避免 NPE
+            liveData.setPageViews(
+                    Optional.ofNullable(redisCache.incrementCacheValue(PAGE_VIEWS_KEY + liveId,0)).orElse(0L)
+            );
+            liveData.setTotalViews(
+                    Optional.ofNullable(redisCache.incrementCacheValue(TOTAL_VIEWS_KEY + liveId,0)).orElse(0L)
+            );
+            liveData.setUniqueVisitors(
+                    Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VISITORS_KEY + liveId))
+                            .map(Set::size)  // 获取集合大小
+                            .map(Long::valueOf)  // 转换为 Long 类型
+                            .orElse(0L)
+            );
+            liveData.setUniqueViewers(
+                    Optional.ofNullable(redisCache.getCacheSet(UNIQUE_VIEWERS_KEY + liveId))
+                            .map(Set::size)  // 获取集合大小
+                            .map(Long::valueOf)  // 转换为 Long 类型
+                            .orElse(0L)
+            );
+            liveData.setPeakConcurrentViewers(
+                    Optional.ofNullable(redisCache.incrementCacheValue(MAX_ONLINE_USERS_KEY + liveId,0)).orElse(0L)
+            );
+
+            // 更新数据库
+            liveDataService.updateLiveData(liveData);
+        }
+    }
+
+
 }

+ 15 - 6
fs-live-app/src/main/java/com/fs/framework/config/DataSourceConfig.java

@@ -23,8 +23,8 @@ import java.util.Map;
 public class DataSourceConfig {
 
     @Bean
-    @ConfigurationProperties(prefix = "spring.datasource.sop.druid.master")
-    public DataSource sopDataSource() {
+    @ConfigurationProperties(prefix = "spring.datasource.clickhouse")
+    public DataSource clickhouseDataSource() {
         return new DruidDataSource();
     }
 
@@ -34,21 +34,30 @@ public class DataSourceConfig {
         return new DruidDataSource();
     }
 
-
+    @Bean
+    @ConfigurationProperties(prefix = "spring.datasource.mysql.druid.slave")
+    public DataSource slaveDataSource() {
+        return new DruidDataSource();
+    }
 
     @Bean
     @Primary
-    public DynamicDataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("sopDataSource") DataSource sopDataSource) {
+    public DynamicDataSource dataSource(@Qualifier("clickhouseDataSource") DataSource clickhouseDataSource,
+                                        @Qualifier("masterDataSource") DataSource masterDataSource,
+                                        @Qualifier("slaveDataSource") DataSource slaveDataSource) {
         Map<Object, Object> targetDataSources = new HashMap<>();
-        targetDataSources.put(DataSourceType.SOP.name(), sopDataSource);
+        targetDataSources.put(DataSourceType.MASTER, masterDataSource);
+        targetDataSources.put(DataSourceType.SLAVE, slaveDataSource);
+        targetDataSources.put(DataSourceType.CLICKHOUSE.name(), clickhouseDataSource); // Ensure matching key
         return new DynamicDataSource(masterDataSource, targetDataSources);
     }
+
     /**
      * 去除监控页面底部的广告
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Bean
-    @ConditionalOnProperty(name = "spring.datasource.mysql.druid.statViewServlet.enabled", havingValue = "true")
+    @ConditionalOnProperty(name = "spring.datasource.druid.statViewServlet.enabled", havingValue = "true")
     public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties)
     {
         // 获取web监控页面的参数

+ 4 - 4
fs-live-app/src/main/java/com/fs/framework/config/DruidConfig.java

@@ -30,7 +30,7 @@ package com.fs.framework.config;//package com.fs.framework.config;
 //public class DruidConfig
 //{
 //    @Bean
-//    @ConfigurationProperties("spring.datasource.mysql.druid.master")
+//    @ConfigurationProperties("spring.datasource.druid.master")
 //    public DataSource masterDataSource(DruidProperties druidProperties)
 //    {
 //        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
@@ -38,8 +38,8 @@ package com.fs.framework.config;//package com.fs.framework.config;
 //    }
 //
 //    @Bean
-//    @ConfigurationProperties("spring.datasource.mysql.druid.slave")
-//    @ConditionalOnProperty(prefix = "spring.datasource.mysql.druid.slave", name = "enabled", havingValue = "true")
+//    @ConfigurationProperties("spring.datasource.druid.slave")
+//    @ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
 //    public DataSource slaveDataSource(DruidProperties druidProperties)
 //    {
 //        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
@@ -80,7 +80,7 @@ package com.fs.framework.config;//package com.fs.framework.config;
 //     */
 //    @SuppressWarnings({ "rawtypes", "unchecked" })
 //    @Bean
-//    @ConditionalOnProperty(name = "spring.datasource.mysql.druid.statViewServlet.enabled", havingValue = "true")
+//    @ConditionalOnProperty(name = "spring.datasource.druid.statViewServlet.enabled", havingValue = "true")
 //    public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties)
 //    {
 //        // 获取web监控页面的参数

+ 57 - 0
fs-live-app/src/main/resources/application-druid.yml

@@ -0,0 +1,57 @@
+# 数据源配置
+spring:
+    datasource:
+        type: com.alibaba.druid.pool.DruidDataSource
+        driverClassName: com.mysql.cj.jdbc.Driver
+        druid:
+            # 主库数据源
+            master:
+                url: jdbc:mysql://localhost:3306/fs-his?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+                username: root
+                password: 19860213
+            # 从库数据源
+            slave:
+                # 从数据源开关/默认关闭
+                enabled: false
+                url:
+                username:
+                password:
+            # 初始连接数
+            initialSize: 5
+            # 最小连接池数量
+            minIdle: 10
+            # 最大连接池数量
+            maxActive: 20
+            # 配置获取连接等待超时的时间
+            maxWait: 60000
+            # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+            timeBetweenEvictionRunsMillis: 60000
+            # 配置一个连接在池中最小生存的时间,单位是毫秒
+            minEvictableIdleTimeMillis: 300000
+            # 配置一个连接在池中最大生存的时间,单位是毫秒
+            maxEvictableIdleTimeMillis: 900000
+            # 配置检测连接是否有效
+            validationQuery: SELECT 1 FROM DUAL
+            testWhileIdle: true
+            testOnBorrow: false
+            testOnReturn: false
+            webStatFilter:
+                enabled: true
+            statViewServlet:
+                enabled: true
+                # 设置白名单,不填则允许所有访问
+                allow:
+                url-pattern: /druid/*
+                # 控制台管理用户名和密码
+                login-username: fs
+                login-password: 123456
+            filter:
+                stat:
+                    enabled: true
+                    # 慢SQL记录
+                    log-slow-sql: true
+                    slow-sql-millis: 1000
+                    merge-sql: true
+                wall:
+                    config:
+                        multi-statement-allow: true

+ 124 - 1
fs-live-app/src/main/resources/application.yml

@@ -1,9 +1,132 @@
+# 项目相关配置
+fs:
+  # 名称
+  name: fs
+  # 版本
+  version: 1.1.0
+  # 版权年份
+  copyrightYear: 2021
+  # 实例演示开关
+  demoEnabled: true
+  # 文件路径 示例( Windows配置D:/fs/uploadPath,Linux配置 /home/fs/uploadPath)
+  profile: c:/fs/uploadPath
+  # 获取ip地址开关
+  addressEnabled: false
+  # 验证码类型 math 数组计算 char 字符验证
+  captchaType: math
+  # APP模块,是通过jwt认证的,如果要使用APP模块,则需要修改【加密秘钥】
+  jwt:
+    # 加密秘钥
+    secret: f4e2e52034348f86b67cde581c0f9eb5
+    # token有效时长,7天,单位秒
+    expire: 604800
+    header: AppToken
+
 # 开发环境配置
 server:
   # 服务器的HTTP端口,默认为
   port: 7114
+  servlet:
+    # 应用的访问路径
+    context-path: /
+  tomcat:
+    # tomcat的URI编码
+    uri-encoding: UTF-8
+    # tomcat最大线程数,默认为200
+    max-threads: 800
+    # Tomcat启动初始化的线程数,默认值25
+    min-spare-threads: 30
+
+# 日志配置
+logging:
+  level:
+    com.fs: debug
+    org.springframework: warn
 
 # Spring配置
 spring:
+  # 资源信息
+  messages:
+    # 国际化资源文件路径
+    basename: i18n/messages
   profiles:
-    active: dev
+#    active: dev
+    active: dev
+  #include: config-dev
+  # 文件上传
+  servlet:
+    multipart:
+      # 单个文件大小
+      max-file-size:  10MB
+      # 设置总上传的文件大小
+      max-request-size:  20MB
+  # 服务模块
+  devtools:
+    restart:
+      # 热部署开关
+      enabled: true
+# token配置
+token:
+  # 令牌自定义标识
+  header: Authorization
+  # 令牌密钥
+  secret: abcdefghijklmnopqrstuvwxyz
+  # 令牌有效期(默认30分钟)
+  expireTime: 1000
+mybatis-plus:
+  # 搜索指定包别名
+  typeAliasesPackage: com.fs.**.domain,com.fs.**.bo
+  # 配置mapper的扫描,找到所有的mapper.xml映射文件
+  mapperLocations: classpath*:/mapper/**/*.xml
+  configLocation: classpath:mybatis/mybatis-config.xml
+  # 全局配置
+  global-config:
+    db-config:
+      # 主键类型  0:"数据库ID自增", 1:"用户输入ID",2:"全局唯一ID (数字类型唯一ID)", 3:"全局唯一ID UUID";
+      idType: AUTO
+      # 字段策略 0:"忽略判断",1:"非 NULL 判断"),2:"非空判断"
+      fieldStrategy: NOT_EMPTY
+    banner: false
+    # 配置
+  configuration:
+    # 驼峰式命名
+    mapUnderscoreToCamelCase: true
+    # 全局映射器启用缓存
+    cacheEnabled: true
+    # 配置默认的执行器
+    defaultExecutorType: REUSE
+    # 允许 JDBC 支持自动生成主键
+    useGeneratedKeys: true
+
+# MyBatis配置
+mybatis:
+  # 搜索指定包别名
+  typeAliasesPackage: com.fs.**.domain
+  # 配置mapper的扫描,找到所有的mapper.xml映射文件
+  mapperLocations: classpath*:mapper/**/*Mapper.xml
+  # 加载全局的配置文件
+  configLocation: classpath:mybatis/mybatis-config.xml
+
+# PageHelper分页插件
+pagehelper:
+  helperDialect: mysql
+  reasonable: false
+  supportMethodsArguments: true
+  params: count=countSql
+
+# Swagger配置
+swagger:
+  # 是否开启swagger
+  enabled: true
+  # 请求前缀
+  pathMapping: /
+
+# 防止XSS攻击
+xss:
+  # 过滤开关
+  enabled: true
+  # 排除链接(多个用逗号分隔)
+  excludes: /system/notice
+  # 匹配链接
+  urlPatterns: /system/*,/monitor/*,/tool/*
+