|
@@ -9,7 +9,7 @@ import java.time.ZoneId;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
import java.time.temporal.ChronoUnit;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
@@ -28,6 +28,7 @@ import com.fs.common.config.FSSysConfig;
|
|
|
import com.fs.common.config.LoginContextManager;
|
|
import com.fs.common.config.LoginContextManager;
|
|
|
import com.fs.common.core.domain.R;
|
|
import com.fs.common.core.domain.R;
|
|
|
import com.fs.common.core.redis.RedisCache;
|
|
import com.fs.common.core.redis.RedisCache;
|
|
|
|
|
+import com.fs.common.core.redis.service.StockDeductService;
|
|
|
import com.fs.common.event.TemplateBean;
|
|
import com.fs.common.event.TemplateBean;
|
|
|
import com.fs.common.event.TemplateEvent;
|
|
import com.fs.common.event.TemplateEvent;
|
|
|
import com.fs.common.event.TemplateListenEnum;
|
|
import com.fs.common.event.TemplateListenEnum;
|
|
@@ -79,11 +80,16 @@ import org.apache.http.util.Asserts;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
|
|
|
+import org.springframework.retry.annotation.Backoff;
|
|
|
|
|
+import org.springframework.retry.annotation.Recover;
|
|
|
|
|
+import org.springframework.retry.annotation.Retryable;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Propagation;
|
|
import org.springframework.transaction.annotation.Propagation;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.springframework.transaction.interceptor.TransactionAspectSupport;
|
|
import org.springframework.transaction.interceptor.TransactionAspectSupport;
|
|
|
|
|
|
|
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
|
|
+
|
|
|
import static com.fs.store.constants.StoreConstants.DELIVERY;
|
|
import static com.fs.store.constants.StoreConstants.DELIVERY;
|
|
|
|
|
|
|
|
|
|
|
|
@@ -101,7 +107,10 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private LiveOrderMapper baseMapper;
|
|
private LiveOrderMapper baseMapper;
|
|
|
|
|
|
|
|
-
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private StockDeductService stockDeductService;
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ private ILiveGoodsService liveGoodsService;
|
|
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private LiveMapper liveMapper;
|
|
private LiveMapper liveMapper;
|
|
@@ -217,6 +226,93 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private ILiveOrderService liveOrderService;
|
|
private ILiveOrderService liveOrderService;
|
|
|
|
|
|
|
|
|
|
+ private final BlockingQueue<LiveGoods> liveGoodsQueue = new LinkedBlockingQueue<>();
|
|
|
|
|
+ private ExecutorService liveGoodsExecutor;
|
|
|
|
|
+ private final int BATCH_SIZE = 500;
|
|
|
|
|
+ // 新增:优雅退出标记(volatile保证多线程可见性)
|
|
|
|
|
+ private volatile boolean isRunning = true;
|
|
|
|
|
+
|
|
|
|
|
+ private void startConsumers(){
|
|
|
|
|
+ liveGoodsExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "liveGoodsConsumer"));
|
|
|
|
|
+ liveGoodsExecutor.submit(this::consumeLiveGoods);
|
|
|
|
|
+ }
|
|
|
|
|
+ private void consumeLiveGoods() {
|
|
|
|
|
+ List<LiveGoods> batch = new ArrayList<>(BATCH_SIZE);
|
|
|
|
|
+ log.info("liveGoodsStock 消费线程启动,队列当前大小: {}", liveGoodsQueue.size());
|
|
|
|
|
+ while (isRunning) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ LiveGoods goods = liveGoodsQueue.poll(1, TimeUnit.SECONDS);
|
|
|
|
|
+ if (goods != null) {
|
|
|
|
|
+ batch.add(goods);
|
|
|
|
|
+ log.info("消费线程获取到数据,当前批量大小: {}", batch.size());
|
|
|
|
|
+ if (batch.size() >= BATCH_SIZE) {
|
|
|
|
|
+ log.info("批量阈值达到{},执行更新", BATCH_SIZE);
|
|
|
|
|
+ batchUpdateLiveGoods(new ArrayList<>(batch));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ log.info("队列超时无数据,执行剩余{}条数据更新", batch.size());
|
|
|
|
|
+ batchUpdateLiveGoods(new ArrayList<>(batch));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("liveGoodsStock 消费线程被中断: {}", e.getMessage(), e);
|
|
|
|
|
+ // 中断时退出循环,保证优雅关闭
|
|
|
|
|
+ isRunning = false;
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 处理剩余的数据(退出循环后)
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ batchUpdateLiveGoods(batch);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // 新增:优雅停止消费线程的方法(如应用关闭时调用)
|
|
|
|
|
+ @PreDestroy
|
|
|
|
|
+ public void stopConsumers() {
|
|
|
|
|
+ isRunning = false;
|
|
|
|
|
+ if (liveGoodsExecutor != null) {
|
|
|
|
|
+ liveGoodsExecutor.shutdown();
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 等待线程终止(最多等5秒)
|
|
|
|
|
+ if (!liveGoodsExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
|
|
|
+ liveGoodsExecutor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ liveGoodsExecutor.shutdownNow();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("liveGoodsStock 消费线程已停止");
|
|
|
|
|
+ }
|
|
|
|
|
+ @Transactional
|
|
|
|
|
+ @Retryable(
|
|
|
|
|
+ value = { Exception.class },
|
|
|
|
|
+ maxAttempts = 3,
|
|
|
|
|
+ backoff = @Backoff(delay = 2000)
|
|
|
|
|
+ )
|
|
|
|
|
+ public void batchUpdateLiveGoods(List<LiveGoods> logsToInsert) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ liveGoodsService.updateBatchById(logsToInsert);
|
|
|
|
|
+ log.info("批量更新 liveGoodsStock 完成,共更新 {} 条记录。", logsToInsert.size());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("批量更新 liveGoodsStock 失败,将触发重试: {}", e.getMessage(), e);
|
|
|
|
|
+ // 核心:必须抛出异常,否则Retryable不会生效
|
|
|
|
|
+ throw new RuntimeException("批量更新失败", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 新增:重试失败后的兜底处理(可选)
|
|
|
|
|
+ @Recover
|
|
|
|
|
+ public void recoverBatchUpdate(Exception e, List<LiveGoods> logsToInsert) {
|
|
|
|
|
+ log.error("批量更新 liveGoodsStock 重试3次仍失败,记录失败数据: {}", e.getMessage(), e);
|
|
|
|
|
+ // 方案:将失败数据写入本地文件/数据库/死信队列,后续人工处理
|
|
|
|
|
+ // failureQueue.offer(logsToInsert);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
|
|
|
public LiveOrderServiceImpl(RedisCache redisCache) {
|
|
public LiveOrderServiceImpl(RedisCache redisCache) {
|
|
|
this.redisCache = redisCache;
|
|
this.redisCache = redisCache;
|
|
@@ -1627,8 +1723,20 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(param.getProductId());
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(param.getProductId());
|
|
|
|
|
+ if (fsStoreProduct == null) {
|
|
|
|
|
+ log.error("商品不存在");
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ // todo yhq 计算价格接口 是否传入了规格参数
|
|
|
|
|
+ FsStoreProductAttrValue fsStoreProductAttrValue = null;
|
|
|
|
|
+ if (!Objects.isNull(param.getAttrValueId())) {
|
|
|
|
|
+ fsStoreProductAttrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueById(param.getAttrValueId());
|
|
|
|
|
+ }
|
|
|
BigDecimal totalPrice = BigDecimal.ZERO;
|
|
BigDecimal totalPrice = BigDecimal.ZERO;
|
|
|
BigDecimal payPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(param.getTotalNum()));
|
|
BigDecimal payPrice = fsStoreProduct.getPrice().multiply(new BigDecimal(param.getTotalNum()));
|
|
|
|
|
+ if (fsStoreProductAttrValue != null) {
|
|
|
|
|
+ payPrice = fsStoreProductAttrValue.getPrice().multiply(new BigDecimal(param.getTotalNum()));
|
|
|
|
|
+ }
|
|
|
totalPrice = totalPrice.add(payPrice);
|
|
totalPrice = totalPrice.add(payPrice);
|
|
|
BigDecimal payDelivery = BigDecimal.ZERO;
|
|
BigDecimal payDelivery = BigDecimal.ZERO;
|
|
|
BigDecimal deductionPrice = BigDecimal.ZERO;
|
|
BigDecimal deductionPrice = BigDecimal.ZERO;
|
|
@@ -2261,7 +2369,7 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
if(liveOrder.getUserAddress() == null) return R.error("用户地址不能为空");
|
|
if(liveOrder.getUserAddress() == null) return R.error("用户地址不能为空");
|
|
|
if(liveOrder.getTotalNum() == null) return R.error("商品数量不能为空");
|
|
if(liveOrder.getTotalNum() == null) return R.error("商品数量不能为空");
|
|
|
|
|
|
|
|
- Live live = liveMapper.selectLiveByLiveId(liveOrder.getLiveId());
|
|
|
|
|
|
|
+ Live live = liveService.selectLiveByLiveId(liveOrder.getLiveId());
|
|
|
if(live == null) return R.error("当前直播不存在");
|
|
if(live == null) return R.error("当前直播不存在");
|
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
|
|
|
LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
|
|
LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
|
|
@@ -2433,6 +2541,20 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void initStock() {
|
|
|
|
|
+ List<LiveGoods> list = liveGoodsService.listAll();
|
|
|
|
|
+ list.forEach(e -> {
|
|
|
|
|
+ if(e.getProductId() == null || e.getLiveId() == null || e.getStock() == null) return;
|
|
|
|
|
+ stockDeductService.initStock(e.getProductId(), e.getLiveId(), e.getStock().intValue());
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ ILiveService liveService;
|
|
|
|
|
+
|
|
|
@Override
|
|
@Override
|
|
|
@Transactional(rollbackFor = Throwable.class,propagation = Propagation.REQUIRED)
|
|
@Transactional(rollbackFor = Throwable.class,propagation = Propagation.REQUIRED)
|
|
|
public R createLiveOrder(LiveOrder liveOrder) {
|
|
public R createLiveOrder(LiveOrder liveOrder) {
|
|
@@ -2451,20 +2573,61 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
if(liveOrder.getUserAddress() == null) return R.error("用户地址不能为空");
|
|
if(liveOrder.getUserAddress() == null) return R.error("用户地址不能为空");
|
|
|
if(liveOrder.getTotalNum() == null) return R.error("商品数量不能为空");
|
|
if(liveOrder.getTotalNum() == null) return R.error("商品数量不能为空");
|
|
|
|
|
|
|
|
- Live live = liveMapper.selectLiveByLiveId(liveOrder.getLiveId());
|
|
|
|
|
|
|
+ Live live = liveService.selectLiveByLiveId(liveOrder.getLiveId());
|
|
|
if(live == null) return R.error("当前直播不存在");
|
|
if(live == null) return R.error("当前直播不存在");
|
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
|
|
FsStoreProduct fsStoreProduct = fsStoreProductService.selectFsStoreProductById(liveOrder.getProductId());
|
|
|
LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
|
|
LiveGoods goods = liveGoodsMapper.selectLiveGoodsByProductId(liveOrder.getLiveId(), liveOrder.getProductId());
|
|
|
|
|
+ if (goods == null) return R.error("当前商品不存在");
|
|
|
if(fsStoreProduct == null) return R.error("店铺已下架商品,购买失败");
|
|
if(fsStoreProduct == null) return R.error("店铺已下架商品,购买失败");
|
|
|
if(fsStoreProduct.getIsShow() == 0 || goods.getStatus() == 0) return R.error("商品已下架,购买失败");
|
|
if(fsStoreProduct.getIsShow() == 0 || goods.getStatus() == 0) return R.error("商品已下架,购买失败");
|
|
|
- if(fsStoreProduct.getStock() < Integer.parseInt(liveOrder.getTotalNum()) || goods.getStock() < Integer.parseInt(liveOrder.getTotalNum())) return R.error("抱歉,这款商品已被抢光,暂时无库存~");
|
|
|
|
|
|
|
+// if(fsStoreProduct.getStock() < Integer.parseInt(liveOrder.getTotalNum()) || goods.getStock() < Integer.parseInt(liveOrder.getTotalNum())) return R.error("抱歉,这款商品已被抢光,暂时无库存~");
|
|
|
|
|
+//
|
|
|
|
|
+// String configJson = configService.selectConfigByKey("store.config");
|
|
|
|
|
+// if (org.apache.commons.lang3.StringUtils.isNotEmpty(configJson)) {
|
|
|
|
|
+// com.fs.store.config.StoreConfig config = com.alibaba.fastjson.JSON.parseObject(configJson, com.fs.store.config.StoreConfig.class);
|
|
|
|
|
+// if (config != null && Boolean.TRUE.equals(config.getCheckStock())) {
|
|
|
|
|
+// CompletableFuture<Boolean> completableFuture = stockDeductService.deductStockAsync(liveOrder.getProductId(), liveOrder.getLiveId(), Integer.parseInt(liveOrder.getTotalNum()), Long.parseLong(liveOrder.getUserId()));
|
|
|
|
|
+// try {
|
|
|
|
|
+// log.info("{}, 商品REDIS 库存扣减成功!", goods.getLiveId());
|
|
|
|
|
+// if (!completableFuture.get()) {
|
|
|
|
|
+// return R.error("抱歉,这款商品已被抢光,暂时无库存~");
|
|
|
|
|
+// }
|
|
|
|
|
+// log.info("{}, 商品REDIS 库存扣减成功!", goods.getLiveId());
|
|
|
|
|
+// } catch (InterruptedException e) {
|
|
|
|
|
+// log.error("高并发处理失败", e);
|
|
|
|
|
+// return R.error("订单创建失败:" + e.getMessage());
|
|
|
|
|
+// } catch (ExecutionException e) {
|
|
|
|
|
+// log.error("高并发处理失败", e);
|
|
|
|
|
+// return R.error("订单创建失败:" + e.getMessage());
|
|
|
|
|
+// }
|
|
|
|
|
+// if (goods.getStock() == null) return R.error("直播间商品库存不足");
|
|
|
|
|
+// }
|
|
|
|
|
+// }
|
|
|
|
|
+// FsStoreProductAttrValueScrm attrValue = fsStoreProductAttrValueMapper.selectFsStoreProductAttrValueById(liveOrder.getAttrValueId());
|
|
|
|
|
+
|
|
|
|
|
|
|
|
// 更改店铺库存
|
|
// 更改店铺库存
|
|
|
- fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
|
|
- fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
|
|
|
|
|
|
|
+// fsStoreProduct.setStock(fsStoreProduct.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
|
|
+// fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
|
|
|
// 更新直播间库存
|
|
// 更新直播间库存
|
|
|
goods.setStock(goods.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
|
|
goods.setStock(goods.getStock()-Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
|
|
+ goods.setSales(goods.getSales() + Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
liveGoodsMapper.updateLiveGoods(goods);
|
|
liveGoodsMapper.updateLiveGoods(goods);
|
|
|
|
|
+// LiveGoods liveGoods = new LiveGoods();
|
|
|
|
|
+// liveGoods.setGoodsId(goods.getGoodsId());
|
|
|
|
|
+// liveGoods.setStock(goods.getStock() - Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
|
|
+// liveGoods.setSales(goods.getSales() + Integer.parseInt(liveOrder.getTotalNum()));
|
|
|
|
|
+// log.info("商品库存修改添加队列:{}", goods.getGoodsId());
|
|
|
|
|
+// try {
|
|
|
|
|
+// boolean offered = liveGoodsQueue.offer(liveGoods, 5, TimeUnit.SECONDS);
|
|
|
|
|
+// if (!offered) {
|
|
|
|
|
+// log.error("liveGoodsStock 队列已满,无法添加日志: {}", JSON.toJSONString(liveGoods));
|
|
|
|
|
+// // 处理队列已满的情况,例如记录到失败队列或持久化存储
|
|
|
|
|
+// }
|
|
|
|
|
+// } catch (InterruptedException e) {
|
|
|
|
|
+// Thread.currentThread().interrupt();
|
|
|
|
|
+// log.error("插入 liveGoodsStock 队列时被中断: {}", e.getMessage(), e);
|
|
|
|
|
+// }
|
|
|
|
|
|
|
|
//判断是否是三种特定产品
|
|
//判断是否是三种特定产品
|
|
|
if (fsStoreProduct.getProductId() != null && (fsStoreProduct.getProductId().equals(3168L)
|
|
if (fsStoreProduct.getProductId() != null && (fsStoreProduct.getProductId().equals(3168L)
|
|
@@ -2673,6 +2836,8 @@ public class LiveOrderServiceImpl implements ILiveOrderService {
|
|
|
fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
|
|
fsStoreProductService.updateFsStoreProduct(fsStoreProduct);
|
|
|
goods.setStock(goods.getStock()+Long.parseLong(liveOrder.getTotalNum()));
|
|
goods.setStock(goods.getStock()+Long.parseLong(liveOrder.getTotalNum()));
|
|
|
liveGoodsMapper.updateLiveGoods(goods);
|
|
liveGoodsMapper.updateLiveGoods(goods);
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
this.refundCoupon(order);
|
|
this.refundCoupon(order);
|
|
|
|
|
|
|
|
return R.ok("操作成功");
|
|
return R.ok("操作成功");
|