Kaynağa Gözat

直播大屏实时轨迹

xw 2 gün önce
ebeveyn
işleme
4473c25fa7

+ 36 - 0
fs-common/src/main/java/com/fs/common/core/redis/RedisCache.java

@@ -127,6 +127,42 @@ public class RedisCache
         return redisTemplate.opsForList().range(key, 0, -1);
     }
 
+    /**
+     * 向列表左侧推入元素
+     *
+     * @param key 缓存的键值
+     * @param value 待推入的值
+     * @return 推入后列表长度
+     */
+    public <T> long leftPushList(final String key, final T value)
+    {
+        Long count = redisTemplate.opsForList().leftPush(key, value);
+        return count == null ? 0 : count;
+    }
+
+    /**
+     * 从列表右侧弹出元素
+     *
+     * @param key 缓存的键值
+     * @return 弹出的元素
+     */
+    public <T> T rightPopList(final String key)
+    {
+        return (T) redisTemplate.opsForList().rightPop(key);
+    }
+
+    /**
+     * 获取列表长度
+     *
+     * @param key 缓存的键值
+     * @return 列表长度
+     */
+    public long getListSize(final String key)
+    {
+        Long size = redisTemplate.opsForList().size(key);
+        return size == null ? 0 : size;
+    }
+
     /**
      * 缓存Set
      *

+ 110 - 0
fs-live-app/src/main/java/com/fs/live/controller/LiveBehaviorPushController.java

@@ -0,0 +1,110 @@
+package com.fs.live.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fs.common.core.controller.BaseController;
+import com.fs.common.core.domain.R;
+import com.fs.common.utils.spring.SpringUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 直播行为推送控制器(HTTP转发接口)
+ *
+ * @author 夏伟
+ * @date 2026-01-20
+ */
+@Slf4j
+@RestController
+@RequestMapping("/app/live/behavior")
+public class LiveBehaviorPushController extends BaseController {
+
+    /**
+     * 接收HTTP转发的行为数据并推送到WebSocket
+     *
+     * @param params 包含liveId和behaviors的参数
+     * @return 推送结果
+     */
+    @PostMapping("/push")
+    public R pushBehaviors(@RequestBody Map<String, Object> params) {
+        try {
+            Long liveId = Long.valueOf(params.get("liveId").toString());
+            @SuppressWarnings("unchecked")
+            List<String> behaviors = (List<String>) params.get("behaviors");
+
+            log.info("[HTTP转发] 收到推送请求, liveId={}, count={}", liveId, behaviors.size());
+
+            if (behaviors.isEmpty()) {
+                return R.error("参数错误");
+            }
+
+            if (!SpringUtils.containsBean("webSocketServer")) {
+                log.error("[HTTP转发] WebSocketServer不存在");
+                return R.error("WebSocketServer不存在");
+            }
+
+            Object webSocketServer = SpringUtils.getBean("webSocketServer");
+            if (webSocketServer == null) {
+                log.error("[HTTP转发] 获取WebSocketServer失败");
+                return R.error("获取WebSocketServer失败");
+            }
+
+            Object sendMsgVo = buildBatchMessage(liveId, behaviors);
+            String message = JSONObject.toJSONString(R.ok().put("data", sendMsgVo));
+
+            Method broadcastMethod = webSocketServer.getClass()
+                .getMethod("broadcastMessage", Long.class, String.class);
+            broadcastMethod.invoke(webSocketServer, liveId, message);
+
+            log.info("[HTTP转发] 推送成功, liveId={}, count={}", liveId, behaviors.size());
+
+            return R.ok("推送成功");
+
+        } catch (Exception e) {
+            log.error("[HTTP转发] 推送失败", e);
+            return R.error("推送失败: " + e.getMessage());
+        }
+    }
+
+    /**
+     * 构建批量推送消息
+     */
+    private Object buildBatchMessage(Long liveId, List<String> behaviorList) {
+        try {
+            Class<?> sendMsgVoClass = Class.forName("com.fs.live.websocket.bean.SendMsgVo");
+            Object sendMsgVo = sendMsgVoClass.newInstance();
+
+            // 解析为对象列表
+            List<Map<String, Object>> behaviors = new ArrayList<>();
+            for (String behaviorStr : behaviorList) {
+                try {
+                    Map<String, Object> behavior = JSONObject.parseObject(behaviorStr, Map.class);
+                    behaviors.add(behavior);
+                } catch (Exception e) {
+                    log.warn("[HTTP转发] 解析行为数据失败: {}", behaviorStr);
+                }
+            }
+
+            sendMsgVoClass.getMethod("setLiveId", Long.class).invoke(sendMsgVo, liveId);
+            sendMsgVoClass.getMethod("setCmd", String.class).invoke(sendMsgVo, "behaviorTrackBatch");
+            sendMsgVoClass.getMethod("setMsg", String.class).invoke(sendMsgVo, "用户行为实时更新");
+            sendMsgVoClass.getMethod("setData", String.class).invoke(sendMsgVo, JSONObject.toJSONString(behaviors));
+
+            return sendMsgVo;
+        } catch (Exception e) {
+            log.warn("[HTTP转发] 构建消息失败", e);
+            // 返回简单的Map作为备用
+            Map<String, Object> msg = new HashMap<>();
+            msg.put("liveId", liveId);
+            msg.put("cmd", "behaviorTrackBatch");
+            msg.put("msg", "用户行为实时更新");
+            msg.put("data", "[" + String.join(",", behaviorList) + "]");
+            return msg;
+        }
+    }
+}

+ 274 - 3
fs-service/src/main/java/com/fs/live/service/impl/LiveUserBehaviorTrackServiceImpl.java

@@ -1,16 +1,34 @@
 package com.fs.live.service.impl;
 
+import com.alibaba.fastjson.JSONObject;
+import com.fs.common.core.domain.R;
+import com.fs.common.core.redis.RedisCache;
 import com.fs.common.utils.DateUtils;
+import com.fs.common.utils.spring.SpringUtils;
 import com.fs.live.domain.LiveUserBehaviorTrack;
 import com.fs.live.mapper.LiveUserBehaviorTrackMapper;
 import com.fs.live.service.ILiveUserBehaviorTrackService;
 import com.fs.live.util.LiveBehaviorTrackUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 
-import java.util.*;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 /**
@@ -26,6 +44,22 @@ public class LiveUserBehaviorTrackServiceImpl implements ILiveUserBehaviorTrackS
     @Autowired
     private LiveUserBehaviorTrackMapper liveUserBehaviorTrackMapper;
 
+    @Autowired(required = false)
+    private RedisCache redisCache;
+
+    private static final String BEHAVIOR_BUFFER_KEY = "live:behavior:buffer:";
+
+    private static final int PUSH_THRESHOLD = 50;
+
+    private static final long WINDOW_MS = 100;
+
+    // 直播间最后推送时间缓存
+    private static final ConcurrentHashMap<Long, Long> lastPushTime = new ConcurrentHashMap<>();
+    // 推送任务调度器
+    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
+    // 推送执行器(异步非阻塞)
+    private static final ExecutorService pushExecutor = Executors.newFixedThreadPool(5);
+
     /**
      * 查询直播用户行为轨迹
      *
@@ -100,8 +134,7 @@ public class LiveUserBehaviorTrackServiceImpl implements ILiveUserBehaviorTrackS
     }
 
     /**
-     * 异步新增直播用户行为轨迹(推荐使用,不阻塞主线程)
-     *
+     *异步+实时
      * @param track 直播用户行为轨迹
      */
     @Async
@@ -109,12 +142,250 @@ public class LiveUserBehaviorTrackServiceImpl implements ILiveUserBehaviorTrackS
     public void asyncInsertLiveUserBehaviorTrack(LiveUserBehaviorTrack track) {
         try {
             insertLiveUserBehaviorTrack(track);
+
+            pushToScreenRealtime(track);
+
         } catch (Exception e) {
             log.error("异步插入用户行为轨迹失败: userId={}, liveId={}, behaviorType={}",
                      track.getUserId(), track.getLiveId(), track.getBehaviorType(), e);
         }
     }
 
+    /**
+     * 实时推送到大屏(滑动窗口批量合并方案)
+     *
+     * 策略:
+     * 1. 行为先缓存到 Redis(极快)
+     * 2. 达到阈值(50条)立即推送
+     * 3. 未达到阈值,100ms 后自动推送
+     * 4. 自动削峰填谷,处理高并发
+     *
+     * @param track 用户行为轨迹
+     */
+    private void pushToScreenRealtime(LiveUserBehaviorTrack track) {
+        if (redisCache == null) {
+            return;
+        }
+
+        try {
+            Long liveId = track.getLiveId();
+            String bufferKey = BEHAVIOR_BUFFER_KEY + liveId;
+
+            // 构建精简数据
+            java.util.Map<String, Object> screenData = new java.util.HashMap<>();
+            screenData.put("userId", track.getUserId());
+            screenData.put("behaviorType", track.getBehaviorType());
+            screenData.put("behaviorDesc", track.getBehaviorDesc());
+            screenData.put("behaviorTime", track.getBehaviorTime());
+            if (track.getResourceId() != null) {
+                screenData.put("resourceId", track.getResourceId());
+            }
+
+            redisCache.leftPushList(bufferKey, JSONObject.toJSONString(screenData));
+            long bufferSize = redisCache.getListSize(bufferKey);
+
+            // 1:达到阈值,立即推送
+            if (bufferSize >= PUSH_THRESHOLD) {
+                triggerPush(liveId, false);
+            }
+            // 2:未达到阈值,调度延迟推送
+            else {
+                scheduleDelayedPush(liveId);
+            }
+
+        } catch (Exception e) {
+            log.debug("[实时推送] 推送失败, liveId={}, userId={}",
+                    track.getLiveId(), track.getUserId());
+        }
+    }
+
+    /**
+     * 调度延迟推送(滑动窗口)
+     * 100ms 内如果没有新的推送触发,则自动推送当前缓冲区
+     */
+    private void scheduleDelayedPush(Long liveId) {
+        Long now = System.currentTimeMillis();
+        Long last = lastPushTime.get(liveId);
+
+        // 如果最近刚推送过,不重复调度
+        if (last != null && (now - last) < WINDOW_MS) {
+            return;
+        }
+
+        // 调度延迟推送任务
+        scheduler.schedule(() -> {
+            triggerPush(liveId, true);
+        }, WINDOW_MS, java.util.concurrent.TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 触发推送(从缓冲区读取并推送到 WebSocket)
+     *
+     * @param liveId 直播间ID
+     * @param isDelayed 是否是延迟触发(用于日志)
+     */
+    private void triggerPush(Long liveId, boolean isDelayed) {
+        // 异步执行推送,不阻塞
+        pushExecutor.submit(() -> {
+            try {
+                String bufferKey = BEHAVIOR_BUFFER_KEY + liveId;
+
+                // 从缓冲区读取所有数据(一次性清空)
+                List<String> behaviorList = new ArrayList<>();
+                while (true) {
+                    String behavior = redisCache.rightPopList(bufferKey);
+                    if (behavior == null) {
+                        break;
+                    }
+                    behaviorList.add(behavior);
+                }
+
+                if (behaviorList.isEmpty()) {
+                    return;
+                }
+
+                pushToWebSocket(liveId, behaviorList);
+
+                // 更新最后推送时间
+                lastPushTime.put(liveId, System.currentTimeMillis());
+
+                log.info("[实时推送] 推送成功, liveId={}, count={}, delayed={}",
+                        liveId, behaviorList.size(), isDelayed);
+
+            } catch (Exception e) {
+                log.error("[实时推送] 推送失败, liveId={}", liveId, e);
+            }
+        });
+    }
+
+    /**
+     * 推送到 WebSocket(支持本地和HTTP转发)
+     */
+    private void pushToWebSocket(Long liveId, List<String> behaviorList) {
+        try {
+            boolean hasBean = SpringUtils.containsBean("webSocketServer");
+
+            if (hasBean) {
+                pushToLocalWebSocket(liveId, behaviorList);
+            } else {
+                pushToRemoteWebSocket(liveId, behaviorList);
+            }
+
+        } catch (Exception e) {
+            log.error("[实时推送] WebSocket推送失败, liveId={}", liveId, e);
+        }
+    }
+
+    /**
+     * 本地推送:直接调用WebSocketServer
+     */
+    private void pushToLocalWebSocket(Long liveId, List<String> behaviorList) {
+        try {
+            Object webSocketServer = SpringUtils.getBean("webSocketServer");
+            if (webSocketServer == null) {
+                return;
+            }
+
+            // 构建推送消息
+            Object sendMsgVo = buildBatchMessage(liveId, behaviorList);
+            String message = JSONObject.toJSONString(R.ok().put("data", sendMsgVo));
+
+            // 调用 broadcastMessage 推送
+            Method broadcastMethod = webSocketServer.getClass()
+                .getMethod("broadcastMessage", Long.class, String.class);
+            broadcastMethod.invoke(webSocketServer, liveId, message);
+
+            log.info("[实时推送] 本地WebSocket推送成功, liveId={}, count={}", liveId, behaviorList.size());
+
+        } catch (Exception e) {
+            log.error("[实时推送] 本地WebSocket推送失败, liveId={}", liveId, e);
+        }
+    }
+
+    /**
+     * 远程推送:通过HTTP转发到fs-live-app
+     */
+    private void pushToRemoteWebSocket(Long liveId, List<String> behaviorList) {
+        try {
+            // 读取配置的fs-live-app地址
+            Environment env = SpringUtils.getBean(Environment.class);
+            String liveWebSocketUrl = env.getProperty("liveWebSocketUrl");
+
+            if (liveWebSocketUrl == null || liveWebSocketUrl.isEmpty()) {
+                log.warn("[实时推送] 未配置liveWebSocketUrl,跳过远程推送");
+                return;
+            }
+
+            log.info("[实时推送] 使用远程模式推送, url={}", liveWebSocketUrl);
+
+            Map<String, Object> params = new HashMap<>();
+            params.put("liveId", liveId);
+            params.put("behaviors", behaviorList);
+
+            String url = liveWebSocketUrl + "/app/live/behavior/push";
+            String jsonParams = JSONObject.toJSONString(params);
+
+            // 使用RestTemplate发送POST请求
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_JSON);
+
+            HttpEntity<String> request = new HttpEntity<>(jsonParams, headers);
+
+            RestTemplate restTemplate = new RestTemplate();
+
+            ResponseEntity<String> response = restTemplate.postForEntity(url, request, String.class);
+
+            log.info("[实时推送] 远程HTTP转发成功, liveId={}, count={}, status={}",
+                    liveId, behaviorList.size(), response.getStatusCode());
+
+        } catch (Exception e) {
+            log.error("[实时推送] 远程HTTP转发失败, liveId={}", liveId, e);
+        }
+    }
+
+    /**
+     * 构建批量推送消息
+     */
+    private Object buildBatchMessage(Long liveId, List<String> behaviorList) {
+        try {
+            Class<?> sendMsgVoClass = Class.forName("com.fs.live.websocket.bean.SendMsgVo");
+            Object sendMsgVo = sendMsgVoClass.newInstance();
+
+            // 解析为对象列表
+            List<Map<String, Object>> behaviors = new ArrayList<>();
+            for (String behaviorStr : behaviorList) {
+                try {
+                    Map<String, Object> behavior = JSONObject.parseObject(behaviorStr, Map.class);
+                    behaviors.add(behavior);
+                } catch (Exception e) {
+                    log.warn("[实时推送] 解析行为数据失败: {}", behaviorStr);
+                }
+            }
+
+            sendMsgVoClass.getMethod("setLiveId", Long.class).invoke(sendMsgVo, liveId);
+            sendMsgVoClass.getMethod("setCmd", String.class).invoke(sendMsgVo, "behaviorTrackBatch");
+            sendMsgVoClass.getMethod("setMsg", String.class).invoke(sendMsgVo, "用户行为实时更新");
+            sendMsgVoClass.getMethod("setData", String.class).invoke(sendMsgVo, JSONObject.toJSONString(behaviors));
+
+            return sendMsgVo;
+        } catch (Exception e) {
+            log.warn("[实时推送] 构建消息失败", e);
+            return buildFallbackBatchMessage(liveId, behaviorList);
+        }
+    }
+
+    /**
+     * 构建备用批量消息
+     */
+    private Map<String, Object> buildFallbackBatchMessage(Long liveId, List<String> behaviorList) {
+        Map<String, Object> msg = new HashMap<>();
+        msg.put("liveId", liveId);
+        msg.put("cmd", "behaviorTrackBatch");
+        msg.put("msg", "用户行为实时更新");
+        msg.put("data", "[" + String.join(",", behaviorList) + "]");
+        return msg;
+    }
+
     /**
      * 批量新增直播用户行为轨迹
      *

+ 3 - 0
fs-service/src/main/resources/application-druid-bjczwh.yml

@@ -171,3 +171,6 @@ tag:
     rate:
         limit: 30
 
+
+liveWebSocketUrl: http://192.168.0.194:7114
+