浏览代码

直播-发送红包消息

Long 2 月之前
父节点
当前提交
5c05ad3938

+ 97 - 97
fs-live-app/src/main/java/com/fs/app/websocket/service/NettyServerRunner.java

@@ -1,97 +1,97 @@
-package com.fs.app.websocket.service;
-
-import com.fs.app.websocket.auth.AuthHandler;
-import com.fs.app.websocket.handle.LiveChatHandler;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PreDestroy;
-import java.util.Objects;
-
-@Slf4j
-@Component
-public class NettyServerRunner implements CommandLineRunner {
-
-    private static final int port = 17114;
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-    private Channel serverChannel;
-    @Autowired
-    private AuthHandler authHandler;
-    @Autowired
-    private LiveChatHandler liveChatHandler;
-
-    @Override
-    public void run(String... args) throws Exception {
-        new Thread(this::startServer).start();
-    }
-
-    private void startServer() {
-        bossGroup = new NioEventLoopGroup(); // 处理连接
-        workerGroup = new NioEventLoopGroup(); // 处理I/O
-        try {
-            ServerBootstrap bootstrap = new ServerBootstrap();
-            bootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-
-                        @Override
-                        protected void initChannel(SocketChannel socketChannel) throws Exception {
-                            ChannelPipeline pipeline = socketChannel.pipeline();
-                            // 编解码
-                            pipeline.addLast(new HttpServerCodec());
-                            // 集合消息
-                            pipeline.addLast(new HttpObjectAggregator(65536));
-                            // 安全校验
-                            pipeline.addLast(authHandler);
-                            // websocket握手
-                            pipeline.addLast(new WebSocketServerProtocolHandler("/app/webSocket", null, true, 65536, false, true));
-                            // 自定义聊天
-                            pipeline.addLast(liveChatHandler);
-                        }
-                    })
-                    .option(ChannelOption.SO_BACKLOG, 1024)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true);
-
-            ChannelFuture future = bootstrap.bind(port).sync();
-            serverChannel = future.channel();
-            log.info("netty server started [{}]", port);
-            serverChannel.closeFuture().sync();
-        } catch (Exception e) {
-            log.error("netty server error msg: {}", e.getMessage(), e);
-        } finally {
-            shutdown();
-        }
-    }
-
-    @PreDestroy
-    public void destroy() {
-        shutdown();
-        log.info("netty server destroy");
-    }
-
-    private void shutdown() {
-        if (Objects.nonNull(bossGroup)) {
-            bossGroup.shutdownGracefully();
-        }
-
-        if (Objects.nonNull(workerGroup)) {
-            workerGroup.shutdownGracefully();
-        }
-
-        if (Objects.nonNull(serverChannel)) {
-            serverChannel.close();
-        }
-        log.info("netty server stopped");
-    }
-}
+//package com.fs.app.websocket.service;
+//
+//import com.fs.app.websocket.auth.AuthHandler;
+//import com.fs.app.websocket.handle.LiveChatHandler;
+//import io.netty.bootstrap.ServerBootstrap;
+//import io.netty.channel.*;
+//import io.netty.channel.nio.NioEventLoopGroup;
+//import io.netty.channel.socket.SocketChannel;
+//import io.netty.channel.socket.nio.NioServerSocketChannel;
+//import io.netty.handler.codec.http.HttpObjectAggregator;
+//import io.netty.handler.codec.http.HttpServerCodec;
+//import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.boot.CommandLineRunner;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.PreDestroy;
+//import java.util.Objects;
+//
+//@Slf4j
+//@Component
+//public class NettyServerRunner implements CommandLineRunner {
+//
+//    private static final int port = 17114;
+//    private EventLoopGroup bossGroup;
+//    private EventLoopGroup workerGroup;
+//    private Channel serverChannel;
+//    @Autowired
+//    private AuthHandler authHandler;
+//    @Autowired
+//    private LiveChatHandler liveChatHandler;
+//
+//    @Override
+//    public void run(String... args) throws Exception {
+//        new Thread(this::startServer).start();
+//    }
+//
+//    private void startServer() {
+//        bossGroup = new NioEventLoopGroup(); // 处理连接
+//        workerGroup = new NioEventLoopGroup(); // 处理I/O
+//        try {
+//            ServerBootstrap bootstrap = new ServerBootstrap();
+//            bootstrap.group(bossGroup, workerGroup)
+//                    .channel(NioServerSocketChannel.class)
+//                    .childHandler(new ChannelInitializer<SocketChannel>() {
+//
+//                        @Override
+//                        protected void initChannel(SocketChannel socketChannel) throws Exception {
+//                            ChannelPipeline pipeline = socketChannel.pipeline();
+//                            // 编解码
+//                            pipeline.addLast(new HttpServerCodec());
+//                            // 集合消息
+//                            pipeline.addLast(new HttpObjectAggregator(65536));
+//                            // 安全校验
+//                            pipeline.addLast(authHandler);
+//                            // websocket握手
+//                            pipeline.addLast(new WebSocketServerProtocolHandler("/app/webSocket", null, true, 65536, false, true));
+//                            // 自定义聊天
+//                            pipeline.addLast(liveChatHandler);
+//                        }
+//                    })
+//                    .option(ChannelOption.SO_BACKLOG, 1024)
+//                    .childOption(ChannelOption.SO_KEEPALIVE, true);
+//
+//            ChannelFuture future = bootstrap.bind(port).sync();
+//            serverChannel = future.channel();
+//            log.info("netty server started [{}]", port);
+//            serverChannel.closeFuture().sync();
+//        } catch (Exception e) {
+//            log.error("netty server error msg: {}", e.getMessage(), e);
+//        } finally {
+//            shutdown();
+//        }
+//    }
+//
+//    @PreDestroy
+//    public void destroy() {
+//        shutdown();
+//        log.info("netty server destroy");
+//    }
+//
+//    private void shutdown() {
+//        if (Objects.nonNull(bossGroup)) {
+//            bossGroup.shutdownGracefully();
+//        }
+//
+//        if (Objects.nonNull(workerGroup)) {
+//            workerGroup.shutdownGracefully();
+//        }
+//
+//        if (Objects.nonNull(serverChannel)) {
+//            serverChannel.close();
+//        }
+//        log.info("netty server stopped");
+//    }
+//}

+ 19 - 4
fs-live-app/src/main/java/com/fs/app/websocket/service/WebSocketServer.java

@@ -1,6 +1,7 @@
 package com.fs.app.websocket.service;
 
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.fs.app.config.ProductionWordFilter;
 import com.fs.app.websocket.auth.WebSocketConfigurator;
@@ -13,11 +14,11 @@ import com.fs.his.domain.FsUser;
 import com.fs.his.service.IFsUserService;
 import com.fs.live.domain.LiveData;
 import com.fs.live.domain.LiveMsg;
+import com.fs.live.domain.LiveRedConf;
 import com.fs.live.domain.LiveWatchUser;
 import com.fs.live.service.*;
 import com.fs.live.vo.LiveWatchUserVO;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
@@ -46,6 +47,7 @@ public class WebSocketServer {
     private final IFsUserService fsUserService = SpringUtils.getBean(IFsUserService.class);
     private final ILiveDataService liveDataService = SpringUtils.getBean(ILiveDataService.class);
     private final ProductionWordFilter productionWordFilter = SpringUtils.getBean(ProductionWordFilter.class);
+    private final ILiveRedConfService liveRedConfService =  SpringUtils.getBean(ILiveRedConfService.class);
     // 直播间在线用户缓存
     //private static final ConcurrentHashMap<Long, Integer> liveOnlineUsers = new ConcurrentHashMap<>();
 
@@ -119,7 +121,7 @@ public class WebSocketServer {
             adminRoom.add(session);
         }
 
-        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
+        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
     }
 
     //关闭连接时调用
@@ -165,7 +167,7 @@ public class WebSocketServer {
             adminRoom.remove(session);
         }
 
-        log.debug("断开webSocket liveId: {}, userId: {}, 直播间人数: {}", liveId, userId, room.size());
+        log.debug("加入webSocket liveId: {}, userId: {}, 直播间人数: {}, 管理端人数: {}", liveId, userId, room.size(), adminRoom.size());
     }
 
     //收到客户端信息
@@ -212,7 +214,7 @@ public class WebSocketServer {
                 case "sendGift":
                     break;
                 case "red":
-                    System.out.println("红包");
+                    processRed(liveId, msg);
                     break;
                 case "lottery":
                     System.out.println("抽奖");
@@ -223,6 +225,19 @@ public class WebSocketServer {
         }
     }
 
+    /**
+     * 处理红包变动消息
+     */
+    private void processRed(Long liveId, SendMsgVo msg) {
+        log.debug("redData: {}", msg);
+        JSONObject jsonObject = JSON.parseObject(msg.getData());
+        LiveRedConf liveRedConf = liveRedConfService.selectLiveRedConfByRedId(jsonObject.getLong("redId"));
+        if (Objects.nonNull(liveRedConf)) {
+            msg.setData(JSONObject.toJSONString(liveRedConf));
+            broadcastMessage(liveId, JSONObject.toJSONString(R.ok().put("data", msg)));
+        }
+    }
+
     //错误时调用
     @OnError
     public void onError(Session session, Throwable throwable) {