RabbitMQ+redis+Redisson分布式锁+seata实现订单服务的流程分析
目录
- 引言
- 订单流程(只展示重要的内容,具体可以到源码查看)
- 订单确认
- 防止订单重复提交
- 订单提交
- Lua脚本
- 校验商品
- 3. 锁定库存方法
- 扩展:解锁库存
- 扩展:扣减库存
- 4. 生成订单
- 创建订单
- 订单超时关单延时队列
- 订单超时未支付系统自动取消监听器
- 订单支付
- 代码摘自
引言
订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到。
- 开启分布式事务可以保证跨多个服务的数据操作的一致性和完整性,
- 使用分布式锁可以确保在同一时间只有一个操作能够成功执行,避免并发引起的问题。
订单流程(只展示重要的内容,具体可以到源码查编程客栈看)
订单确认
public OrderConfirmVO confirmOrder(Long skuId) { Long memberId = SecurityUtils.getMemberId(); // 解决子线程无法获取HttpServletRequest请求对象中数据的问题,子线程指getOrderItemsFuture,getMemberAddressFuture,generateOrderTokenFuture //feign远程调用会被拦截提取attributes并转发 RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); RequestContextHolder.setRequestAttributes(attributes, true); // 获取订单商品 //使用了 CompletableFuture 来实现异步执行获取订单项的操作 CompletableFuture<List<OrderItemDTO>> getOrderItemsFuture = CompletableFuture.supplyAsync( () -> this.getOrderItems(skuId, memberId), threadPoolExecutor) .exceptionally(ex -> { log.error("Failed to get order items: {}", ex.toString()); return null; }); // 用户收货地址 CompletableFuture<List<MemberAddressDTO>> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> { Result<List<MemberAddressDTO>> getMemberAddressResult = memberFeignClient.listMemberAddresses(memberId); if (Result.isSuccess(getMemberAddressResult)) { return getMemberAddressResult.getData(); } return null; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString()); return null; }); // 生成唯一令牌,防止重复提交(原理:提交会消耗令牌,令牌被消耗无法再次提交) CompletableFuture<String> generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> { String orderToken = this.generateTradeNo(memberId); RedisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken); return orderToken; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to generate order token ."); return null; }); //CompletableFuture.allOf 方法,可以等待所有 CompletableFuture 对象都完成再进行后续操作, // 确保获取和设置属性的操作都能够成功执行。这样可以避免程序出现异常, CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join(); OrderConfirmVO orderConfirmVO = new OrderConfirmVO(); orderConfirmVO.setOrderItems(getOrderItemsFuture.join()); orderConfirmVO.setAddresses(getMemberAddressFuture.join()); orderConfirmVO.setOrderToken(generateOrderTokenFuture.join()); log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO); return orderConfirmVO; }
防止订单重复提交
通过生成唯一令牌解决,方法:
根据自定义方法generateTradeNo生成订单号,将时间戳+3位随机数+5位id(由会员id组成,不够5位补0,超过5位保留后5位)组成订单号
private String generateTradeNo(Long memberId) { //当 memberId 的位数小于 5 位时,使用 0 来填充位数不足的部分,如果 memberId 已经是 5 位数或更长,则不进行填充 String userIdFilledzero = String.format("%05d", memberId); //超出五位的保留后五位 String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5); // 在前面加上wxo(wx order)等前缀是为了人工可以快速分辨订单号是下单还是退款、来自哪家支付机构等 // 将时间戳+3位随机数+五位id组成商户订单号,规则参考自<a href="https://tech.meituan.com/2016/11/18/dianping-order-db-sharding.html" rel="external nofollow" >大众点评</a> return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId; }
根据订单防重提交令牌缓存键前缀+订单号作为key存入redis,值为订单号。
订单提交的时候,会通过lua脚本验证redis是否含有该key,有则返回0,通过断言阻止重复提交,下面订单提交方法的代码含有该部分。
扩展:CompletableFuture 来实现异步执行获取订单项的操作,可以提高响应速度,减少阻塞时间,同时通过CompletableFuture可以更方便地处理异常,每个异步操作可以独立地捕获和处理异常,避免异常传递给上层调用者,提高了系统的健壮性和容错性。CompletableFuture对于处理大量并发请求的场景非常重要,可以提升系统的性能和用户体验。
涉及的自定义线程池(相关解释直接在代码注释上了)
/** * 自定义订单线程池 * */ @Configuration @Slf4j public class ThreadPoolConfig { @Bean public ThreadPoolExecutor threadPoolExecutor() { int cpuCoreSize = Runtime.getRuntime().availableProcessors();//使用 Java 获取可用的 CPU 核心数 log.info("当前CPU核心数:{}", cpuCoreSize); /* * 计算密集型: 核心线程数=CPU核心 +1 √ * I/O密集型: 核心线程数=2*CPU核心 +1 */ int corePoolSize = cpuCoreSize + 1; return new ThreadPoolExecutor( //核心线程数 corePoolSize, //最大线程数 2 * corePoolSize, //线程空闲(存活)时间;当线程数超过核心线程数,并且空闲时间超过指定时间时,多余的线程会被销毁 30, TimeUnit.SECONDS, //任务队列(选取数组阻塞队列)特点:固定容量,公平性,先进先出 new ArrayblockingQueue<>(1000), //线程工厂 new NamedThreadFactory("order") // 订单线程 ); } }
订单提交
@GlobalTransactional public String submitOrder(OrderSubmitForm submitForm) { log.info("订单提交参数:{}", jsONUtil.toJsonStr(submitForm)); String orderToken = submitForm.getOrderToken(); // 1. 判断订单是否重复提交(LUA脚本保证获取和删除的原子性,成功返回1,否则返回0) //KEYS[1]指OrderConstants.ORDER_TOKEN_PREFIX + orderToken,ARGV[1]指orderToken String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //这一行代码使用RedisTempl编程ate的execute方法执行Lua脚本。通过new DefaultRedisScript<>创建一个Redis脚本对象,并指定脚本字符串和返回结果的类型(Long)。 // 然后使用Collections.singletonList来创建包含锁键名和订单令牌参数的列表,传递给execute方法。 //执行 Redis 脚本后,结果会赋值给 lockAcquired 变量,它的类型是 Long。 // lockAcquired这个值表示获取锁的结果,如果成功获取并删除了锁,则为 1;如果获取锁失败,则为 0。 Long lockAcquired = this.redisTemplate.execute( new DefaultRedisScript<>(lockAcquireScript, Long.class), Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken), orderToken ); Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "订单重复提交,请刷新页面后重试"); // 2. 订单商品校验 (PS:校验进入订单确认页面到提交过程商品(价格、上架状态)变化) List<OrderSubmitForm.OrderItem> orderItems = submitForm.getOrderItems(); List<Long> skuIds = orderItems.stream() .map(OrderSubmitForm.OrderItem::getSkuId) .collect(Collectors.toList()); List<SkuInfoDTO> skuList = skuFeignClient.getSkuInfoList(skuIds); for (OrderSubmitForm.OrderItem item : orderItems) { SkuInfoDTO skuInfo = skuList.stream().filter(sku -> sku.getId().equals(item.getSkuId())) .findFirst() .orElse(null); Assert.isTrue(skuInfo != null, "商品({})已下架或删除"); //如果调用对象小于被比较对象,compareTo 方法返回一个负整数。 // 如果调用对象大于被比较对象,compareTo 方法返回一个正整数。 Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "商品({})价格发生变动,请刷新页面", item.getSkuName()); } // 3. 校验库存并锁定库存 List<LockedSkuDTO> lockedSkuList = orderItems.stream() .map(item -> new LockedSkuDTO(item.getSkuId(), item.getQuantity(), item.getSkuSn())) .collect(Collectors.toList()); boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList); Assert.isTrue(lockStockResult, "订单提交失败:锁定商品库存失败!"); // 4. 生成订单 boolean result = this.saveOrder(submitForm); log.info("order ({}) create result:{}", orderToken, result); return orderToken; }
前提:此处涉及seata分布式事务和Redisson实现的分布式锁
lua脚本
KEYS[1] 指OrderConstants.ORDER_TOKEN_PREFIX + orderToken
ARGV[1] 指orderToken
判断是否含有这个锁key,有就删除这个锁并返回1L,没有就返回0
通过断言判断lockAcquired是否成功获取并删除了锁,不成功会报错
校验商品
- 根据传入的表单对象拿到订单商品集合,然后通过stream拿到订单商品id集合
- 通过Feign远程调用得到对应订单id集合的库存商品集合
- 遍历订单商品集合,通过stream过滤拿到id对应的商品,判断商品是否为null,以及是否跟库存商品价格不一致
- 订单商品集合通过stream返回用于锁定库存的对象集合,传入对象的有库存商品id(Skuid),订单商品数量(Quantity),订单商品编号(skuSn)
3. 锁定库存方法
@Transactional public boolean lockStock(String orderToken, List<LockedSkuDTO> lockedSkuList) { Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "订单({})未包含任何商品", orderToken); // 校验库存数量是否足够以及锁定库存 for (LockedSkuDTO lockedSku : lockedSkuList) { Long skuId = lockedSku.getSkuId(); //商品分布式锁缓存键前缀:ProductConstants.SKU_LOCK_PREFIX //每次getLock都会返回一个独立的分布式锁对象,但它们共享一个锁资源。 RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId); // 构建商品锁对象 try { //共享一个锁资源意味着多个分布式锁对象共同使用同一个锁来实现互斥访问。 //虽然每个分布式锁对象是独立创建的,但它们会使用相同的锁资源来进行加锁和释放锁的操作。 lock.lock();//锁定操作 Integer quantity = lockedSku.getQuantity(); // 订单的商品数量 // 库存足够 boolean lockResult = this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock + " + quantity) // 修改锁定商品数 .eq(PmsSku::getId, lockedSku.getSkuId()) //通过 apply 方法添加动态 SQL 条件,确保 stock 减去 locked_stock 的值大于等于给定的 quantity。 //使用了占位符 {0} 来引用 quantity,0表示第一个传入的值。 .apply("stock - locked_stock >= {0}", quantity) // 剩余商品数 ≥ 订单商品数 ); Assert.isTrue(lockResult, "商品({})库存不足", lockedSku.getSkuSn()); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 锁定的商品缓存至 Redis (后续使用:1.取消订单解锁库存;2:支付订单扣减库存) redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList); return true; }
- 断言判断锁定库存对象集合是否为null
- 遍历锁定库存对象集合,拿到库存商品id,通过商品分布式锁缓存键前缀+库存商品id创建 分布式锁,然后进行加锁操作(具体细节看代码注释就明白了)通过更新语句来更新锁 定商品数以及判断剩余商品数是否大于订单商品数。
- try语句最后通过判断分布式锁是否被锁定,是就释放锁。
- 将商品分布式锁缓存键前缀+订单号(orderToken)作为键,锁定库存集合作为值存入redis,以便后续取消订单时解锁库存和支付订单时扣减库存。
扩展:解锁库存
/** *解锁库存 *<P> *订单超时未支付,释放锁定的商品库存 */ public boolean unlockStock(String orderSn) { //锁定库存对象集合:lockedSkus List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("释放订单({})锁定的商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); // 库存已释放 if (CollectionUtil.isEmpty(lockedSkus)) { return true; } // 遍历恢复锁定的商品库存 for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 获取商品分布式锁 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = lojavascriptcked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { //判断当前分布式锁是否已被锁定,通过这个判断可以防止非法释放锁等潜在问题 if (lock.isLocked()) { lock.unlock(); } } } // 移除 redis 订单锁定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
- 通过redis获取锁定库存对象集合,判断集合是否为空,空则表示库存已释放,直接返回true
- 不为空就进行遍历锁定库存对象集合(lockedSkus),获取分布式锁对象,进行加锁操作,然后 执行更新语句,扣减锁定商品数(即恢复原来的锁定商品数),最后释放锁
- 释放库存完后就可以移除redis订单锁定的商品,这样再执行释放库存的时候就直接返回true
扩展:扣减库存
/** * 扣减库存 * <p> * 订单支付扣减商品库存和释放锁定库存 * * @param orderSn 订单编号 * @return ture/false */ public boolean deductStock(String orderSn) { // 获取订单提交时锁定的商品 List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("订单({})支付成功,扣减订单商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "扣减商品库存失败:订单({})未包含商品"); for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 获取商品分布式锁 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("stock = stock - " + lockedSku.getQuantity()) .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 移除订单锁定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
原理跟释放库存基本一致,更新语句发生变化
更新语句:将商品库存数进行扣减,扣减数量为订单商品数,然后锁定商品数也进行扣 减,扣减数量为订单商品数
4. 生成订单
通过saveOrder创建订单,返回result,打印日志。
返回订单号
创建订单
/** * 创建订单 * * @param submitForm 订单提交表单对象 * @return */ private boolean saveOrder(OrderSubmitForm submitForm) { //创建订单详情表(OmsOrder)对象 OmsOrder order = orderConverter.form2Entity(submitForm); //设置待支付状态 order.setStatus(OrderStatusEnum.UNPAID.getValue()); //设置订单会员id order.setMemberId(SecurityUtils.getMemberId()); //设置订单来源(0代表PC订单,1代表APP订单) order.setSource(submitForm.getOrderSource().getValue()); //保存到数据库 boolean result = this.save(order); Long orderId = order.getId(); if (result) { // 保存订单明细 List<OmsOrderItem> orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems()); orderItemEntities.forEach(item -> item.setOrderId(orderId)); orderItemService.saveBATch(orderItemEntities); // 订单超时未支付取消 //这行代码使用 RabbitMQ 的 Java 客户端库来发送一条消息到 order.exchange 交换器, // 该消息会被路由到 order.close.delay 队列中。消息的内容是 submitForm.getOrderToken() 方法的返回结果。 rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken()); } return result; }
普通的保存到数据的操作就不做解释了,主要看订单超时未支付取消的功能。
订单超时未支付取消功能通过rabbitMQ实现
订单超时关单延时队列
@Component @Slf4j public class OrderRabbitConfig { // 普通延迟队列 private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue"; private static final String ORDER_EXCHANGE = "order.exchange"; private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay"; // 死信关单队列 private static final String ORDER_ClOSE_QUEUE = "order.close.queue"; private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange"; private static final String ORDER_ClOSE_ROUTING_KEY = "order.close"; /** * 定义交换机 */ @Bean public Exchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } /** * 死信交换机 */ @Bean public Exchange orderDlxExchange() { return new DirectExchange(ORDER_DLX_EXCHANGE, true, false); } /** * 延时队列 */ @Bean public Queue orderDelayQueue() { // 延时队列的消息过期了,会自动触发消息的转发,根据routingKey发送到指定的exchange中,exchange路由到死信队列 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE); args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key args.put("x-message-ttl", 10 * 1000L); // 单位毫秒,10s用于测试 return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args); } /** * 延时队列绑定交换机 */ @Bean public Binding orderDelayQueueBinding() { return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE, ORDER_CLOSE_DELAY_ROUTING_KEY, null); } /** * 关单队列 */ @Bean public Queue orderCloseQueue() { log.info("死信队列(order.close.queue)创建"); return nephpw Queue(ORDER_ClOSE_QUEUE, true, false, false); } /** * 关单队列绑定死信交换机 */ @Bean public Binding orderCloseQueueBinding() { return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE, ORDER_ClOSE_ROUTING_KEY, null); } }
- 绑定死信交换机后,超过10s就会路由到死信队列(order.close.queue)中。
订单超时未支付系统自动取消监听器
/** * 订单超时未支付系统自动取消监听器 * */ @Component @RequiredArgsConstructor @Slf4j public class OrderCloseListener { private final OrderService orderService; private final RabbitTemplate rabbitTemplate; @RabbitListener(queues = "order.close.queue") public void closeorder(String orderSn, Message messhttp://www.devze.comage, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序号(消息队列中的位置) log.info("订单({})超时未支付,系统自动关闭订单", orderSn); try { boolean closeOrderResult = orderService.closeOrder(orderSn); log.info("关单结果:{}", closeOrderResult); if (closeOrderResult) { // 关单成功:释放库存 //发送订单号 rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn); } else { // 关单失败:订单已被关闭,手动ACK确认并从队列移除消息 channel.basicAck(deliveryTag, false); // false: 不批量确认,仅确认当前单个消息 } } catch (Exception e) { // 关单异常:拒绝消息并重新入队 try { channel.basicReject(deliveryTag, true); // true: 重新放回队列 // channel.basicReject(deliveryTag, false); // false: 直接丢弃消息 (TODO 定时任务补偿) } catch (IOException ex) { log.error("订单({})关闭失败,原因:{}", orderSn, ex.getMessage()); } } } }
监听死信关单队列,拿到消息序号deliveryTag(便于后续手动ACK和重新入队的操作)
进行关单操作orderService.closeOrder,主要修改订单详情表的状态从待支付改为已关闭。
public boolean closeOrder(String orderSn) { return this.update(new LambdaUpdateWrapper<OmsOrder>() .eq(OmsOrder::getOrderSn, orderSn) .eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue()) .set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue()) ); }
判断关单是否成功,成功就释放库存,发送订单号给释放库存的交换机,路由键为stock.unlock。
/** *库存释放监听器 */ @Component @Slf4j @RequiredArgsConstructor public class StockReleaseListener { private final SkuService skuService; private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue"; private static final String STOCK_EXCHANGE = "stock.exchange"; private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = STOCK_UNLOCK_QUEUE,durable = "true"), exchange = @Exchange(value = STOCK_EXCHANGE), key = {STOCK_UNLOCK_ROUTING_KEY} ), ackMode = "MANUAL")//手动ACK @RabbitHandler public void UnlockStock(String orderSn, Message message, Channel channel){ log.info("订单{}取消释放库存",orderSn); long deliverTag=message.getMessageProperties().getDeliveryTag(); try{ skuService.unlockStock(orderSn); channel.basicAck(deliverTag,false); }catch (Exception e){ try { channel.basicAck(deliverTag,true); }catch (IOException ex){ log.error("订单{}关闭失败,原因:{}",orderSn,ex.getMessage()); } } } }
如果收到释放库存的消息,就会执行释放库存的方法unlockStock,上面锁定库存方法那节已解释。
订单支付
@GlobalTransactional public <T> T payOrder(OrderPaymentForm paymentForm) { String orderSn = paymentForm.getOrderSn(); OmsOrder order = this.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSn)); Assert.isTrue(order != null, "订单不存在"); Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "订单不可支付,请检查订单状态"); RLock lock = redissonClient.getLock(OrderConstants.ORDER_LOCK_PREFIX + order.getOrderSn()); try { lock.lock(); T result; switch (paymentForm.getPaymentMethod()) { case WX_JSAPI: result = (T) wxJsapiPay(paymentForm.getAppId(), order.getOrderSn(), order.getPaymentAmount()); break; default: result = (T) balancePay(order); break; } return result; } finally { //释放锁 if (lock.isLocked()) { lock.unlock(); } } }
代码摘自
youlai-mall: 基于 Spring Boot 3、Spring Cloud & Alibaba 2022、SAS OAuth2 、vue3、Element-Plus、uni-app 构建的开源全栈商城。 (gitee.com)
到此这篇关于RabbitMQ+redis+Redisson分布式锁+seata实现订单服务的文章就介绍到这了,更多相关Redisson分布式锁订单服务内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论