吴树波 hace 1 semana
padre
commit
031cf1ed73

+ 16 - 0
fs-common/src/main/java/com/fs/common/constant/RedisConstant.java

@@ -0,0 +1,16 @@
+package com.fs.common.constant;
+/**
+ * 库存与锁相关常量(Java 8 静态常量优化)
+ */
+public class RedisConstant {
+    // 库存Key前缀
+    public static final String STOCK_KEY_PREFIX = "product:stock:";
+    // 分布式锁Key前缀
+    public static final String LOCK_KEY_PREFIX = "product:lock:";
+    // 锁过期时间(30秒,避免死锁,大于业务执行时间)
+    public static final long LOCK_EXPIRE_SECONDS = 3L;
+    // 锁重试间隔(50毫秒,非阻塞重试,避免线程阻塞)
+    public static final long LOCK_RETRY_INTERVAL = 100L;
+    // 锁最大重试次数(3次,避免无限重试)
+    public static final int LOCK_MAX_RETRY = 20;
+}

+ 132 - 0
fs-common/src/main/java/com/fs/common/core/redis/service/StockDeductService.java

@@ -0,0 +1,132 @@
+package com.fs.common.core.redis.service;
+
+import com.fs.common.constant.RedisConstant;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+/**
+ * 高并发库存扣减服务(Java 8 + Redis分布式锁)
+ */
+@Service
+public class StockDeductService {
+
+    // 注入RedisTemplate
+    public final RedisTemplate<String, Object> redisTemplate;
+
+    // 构造器注入(Spring 推荐,Java 8 支持)
+    public StockDeductService(RedisTemplate<String, Object> redisTemplate) {
+        this.redisTemplate = redisTemplate;
+    }
+
+    // 库存扣减Lua脚本(预编译,提升高并发性能)
+    private static final DefaultRedisScript<Long> STOCK_DEDUCT_SCRIPT;
+    // 锁释放Lua脚本(预编译)
+    private static final DefaultRedisScript<Long> LOCK_RELEASE_SCRIPT;
+
+    // 库存扣减Lua脚本(优化后,增强健壮性)
+    static {
+        // 初始化库存扣减脚本
+        STOCK_DEDUCT_SCRIPT = new DefaultRedisScript<>();
+        STOCK_DEDUCT_SCRIPT.setScriptText("if redis.call('exists', KEYS[1]) ~= 1 then " + "return -2; " + "end " + "local stock_str = redis.call('get', KEYS[1]); " + "local stock = tonumber(stock_str); " + "if stock == nil then " + "return -3; " + "end " + "local deductNum_str = ARGV[1]; " + "local deductNum = tonumber(deductNum_str); " + "if deductNum == nil or deductNum <= 0 then " + "return -4; " + "end " + "if stock >= deductNum then " + "return redis.call('decrby', KEYS[1], deductNum); " + "else " + "return -1; " + "end");
+        STOCK_DEDUCT_SCRIPT.setResultType(Long.class);
+
+        // 锁释放脚本保持不变
+        LOCK_RELEASE_SCRIPT = new DefaultRedisScript<>();
+        LOCK_RELEASE_SCRIPT.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else return 0 end");
+        LOCK_RELEASE_SCRIPT.setResultType(Long.class);
+    }
+
+    /**
+     * 初始化商品库存(Redis)
+     *
+     * @param productId 商品ID
+     * @param initStock 初始库存
+     */
+    public void initStock(Long productId, Integer initStock) {
+        String stockKey = RedisConstant.STOCK_KEY_PREFIX + productId;
+        redisTemplate.opsForValue().set(stockKey, initStock, 24 * 60 * 60, TimeUnit.SECONDS);
+        System.out.println("商品" + productId + "库存初始化完成,初始库存:" + initStock);
+    }
+
+    /**
+     * 高并发库存扣减(核心方法,落地Java 8特性)
+     *
+     * @param productId 商品ID
+     * @param deductNum 扣减数量(默认1)
+     * @return 扣减结果:true=成功,false=失败
+     */
+    public CompletableFuture<Boolean> deductStockAsync(Long productId, Integer deductNum) {
+        // Java 8 CompletableFuture 异步处理,提升高并发吞吐量
+        return CompletableFuture.supplyAsync(() -> {
+            // 1. 参数校验(Java 8 Optional 空值处理)
+            Integer num = Optional.ofNullable(deductNum).orElse(1);
+            String stockKey = RedisConstant.STOCK_KEY_PREFIX + productId;
+            String lockKey = RedisConstant.LOCK_KEY_PREFIX + productId;
+
+            // 2. 生成锁持有者唯一标识(UUID + 线程ID,避免误释放)
+            String lockOwner = UUID.randomUUID().toString() + "-" + Thread.currentThread().getId();
+
+            // 3. 尝试获取分布式锁(非阻塞重试,Java 8 Stream API 实现重试)
+// 3. 尝试获取分布式锁(优化:加入随机延迟,避免惊群效应)
+            boolean isLockAcquired = IntStream.range(0, RedisConstant.LOCK_MAX_RETRY).anyMatch(retryCount -> {
+                Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, lockOwner, RedisConstant.LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);
+                if (Boolean.TRUE.equals(result)) {
+                    return true;
+                }
+                try {
+                    // 随机延迟:50ms~150ms,避免所有请求同时重试
+                    long randomDelay = RedisConstant.LOCK_RETRY_INTERVAL + new Random().nextInt(100);
+                    TimeUnit.MILLISECONDS.sleep(randomDelay);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+                return false;
+            });
+            // 4. 未获取到锁,直接返回失败
+            if (!isLockAcquired) {
+                System.err.println("商品" + productId + "获取锁失败,高并发限流中");
+                return false;
+            }
+
+            try {
+                // 5. 执行库存扣减Lua脚本(原子操作,防超卖)
+                // 新增日志:打印当前库存值和扣减数量
+                Integer currentStockStr = (Integer) redisTemplate.opsForValue().get(stockKey);
+                System.out.println("拿到锁成功 → 库存Key:" + stockKey + ",当前库存值:" + currentStockStr + ",扣减数量:" + num);
+
+                // 执行库存扣减Lua脚本
+                Long remainingStock = redisTemplate.execute(
+                        STOCK_DEDUCT_SCRIPT,
+                        Collections.singletonList(stockKey),
+                        1
+                );
+
+                // 新增日志:打印Lua返回结果
+                System.out.println("Lua脚本返回值:" + remainingStock);
+
+                // 6. 判断扣减结果
+                if (remainingStock != null && remainingStock >= 0) {
+                    System.out.println("商品" + productId + "库存扣减成功,剩余库存:" + remainingStock);
+                    return true;
+                } else {
+                    System.err.println("商品" + productId + "库存不足,扣减失败");
+                    return false;
+                }
+            } finally {
+                // 7. 释放分布式锁(Lua脚本保证原子性,仅释放自己持有的锁)
+                redisTemplate.execute(LOCK_RELEASE_SCRIPT, Collections.singletonList(lockKey), lockOwner);
+                System.out.println("商品" + productId + "锁释放成功,持有者:" + lockOwner);
+            }
+        });
+    }
+}

+ 3 - 0
fs-service/src/main/java/com/fs/hisStore/service/impl/FsStoreProductScrmServiceImpl.java

@@ -15,6 +15,7 @@ import com.fs.common.BeanCopyUtils;
 import com.fs.common.constant.LiveKeysConstant;
 import com.fs.common.core.domain.R;
 import com.fs.common.core.redis.RedisCache;
+import com.fs.common.core.redis.RedisCacheT;
 import com.fs.common.exception.CustomException;
 import com.fs.common.exception.ServiceException;
 import com.fs.common.param.BaseQueryParam;
@@ -145,6 +146,8 @@ public class FsStoreProductScrmServiceImpl implements IFsStoreProductScrmService
 
     @Autowired
     private RedisCache redisCache;
+    @Autowired
+    private RedisCacheT<Integer> redisCacheT;
 
     /**
      * 清除商品详情缓存

+ 5 - 0
fs-user-app/pom.xml

@@ -108,6 +108,11 @@
             <artifactId>rocketmq-spring-boot-starter</artifactId>
             <version>2.2.3</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 105 - 0
fs-user-app/src/test/java/com/fs/test/StockDeductTest.java

@@ -0,0 +1,105 @@
+package com.fs.test;
+
+import com.fs.FsUserAppApplication;
+import com.fs.common.constant.RedisConstant;
+import com.fs.common.core.redis.service.StockDeductService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * 50万高并发库存扣减测试
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = FsUserAppApplication.class)
+@RequiredArgsConstructor
+@Slf4j
+public class StockDeductTest {
+
+    @Autowired
+    private StockDeductService stockDeductService;
+
+    // 商品ID
+    private static final Long PRODUCT_ID = 1001L;
+    // 初始库存(模拟5万库存,应对50万并发扣减)
+    private static final Integer INIT_STOCK = 5000;
+    // 总请求数(50万)
+    private static final int TOTAL_REQUESTS = 50000;
+
+    /**
+     * 模拟50万高并发库存扣减
+     */
+    @Test
+    public void testHighConcurrencyDeduct() throws InterruptedException, ExecutionException {
+        stockDeductService.initStock(PRODUCT_ID, INIT_STOCK);
+        // Java 8 ExecutorService 线程池(固定线程池,适配高并发)
+        ExecutorService executorService = createHighConcurrencyPool();
+
+        // 存储所有异步任务结果
+        List<CompletableFuture<Boolean>> futureList = new ArrayList<>();
+
+        // 提交50万请求
+        for (int i = 0; i < TOTAL_REQUESTS; i++) {
+            futureList.add(stockDeductService.deductStockAsync(PRODUCT_ID, 1));
+        }
+
+        // 等待所有任务完成(Java 8 CompletableFuture 批量处理)
+        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futureList.toArray(new CompletableFuture[0])
+        );
+        allFutures.get();
+
+        // 统计结果
+        long successCount = futureList.stream()
+                .map(future -> {
+                    try {
+                        return future.get();
+                    } catch (Exception e) {
+                        return false;
+                    }
+                })
+                .filter(Boolean::booleanValue)
+                .count();
+
+        // 打印结果
+        System.out.println("======================================");
+        System.out.println("50万高并发库存扣减测试完成");
+        System.out.println("成功扣减次数:" + successCount);
+        System.out.println("失败扣减次数:" + (TOTAL_REQUESTS - successCount));
+        System.out.println("最终剩余库存:" + stockDeductService.redisTemplate.opsForValue().get(RedisConstant.STOCK_KEY_PREFIX + PRODUCT_ID));
+        System.out.println("======================================");
+
+        // 关闭线程池
+        executorService.shutdown();
+        executorService.awaitTermination(1, TimeUnit.MINUTES);
+    }
+
+
+
+    private static ExecutorService createHighConcurrencyPool() {
+        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; // CPU核心数*2
+        int maximumPoolSize = 200; // 最大线程数,根据服务器配置调整
+        long keepAliveTime = 60L;
+        // 用SynchronousQueue,直接提交任务,避免队列积压
+        BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
+        // 拒绝策略:丢弃最老的任务,避免OOM
+        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+
+        return new ThreadPoolExecutor(
+                corePoolSize,
+                maximumPoolSize,
+                keepAliveTime,
+                TimeUnit.SECONDS,
+                workQueue,
+                new ThreadPoolExecutor.CallerRunsPolicy() // 兜底:主线程执行,避免任务丢失
+        );
+    }
+}