开发者

@Transactional用法详解与事务避坑指南

目录
  • 前言
  • 一、Java的@Transactional解释
  • 二、踩坑
  • 二、状态统一入口:推送
  • 三、MQ消费
  • 四、问题与解决
    • 解决方式
  • 五、总结
    • 需求:业务单位需要观测订单动态,所以要在订单的各个状态节点上传状态至状态池系统
    • 方案:又是Easy的需求,不就是在每个订单状态改变时触发上传状态,为了不影响订单正常流转http://www.devze.com,增加一个消息中间件MQ

    前言

    在Java中,@Transactional是一个非常重要的注解,用于声明事务管理的行为。

    它可以被应用在类级别或方法级别上,并且提供了多种选项来控制事务的传播行为、隔离级别、超时设置和回滚条件等。

    一、Java的@Transactional解释

    @Transactional 注解在 Java 中用于声明式事务管理,它可以应用于接口、接口方法、类以及类的方法上。在默认配置下,Spring 中的 @Transactional 注解会使得方法在事务的上下文中执行,其事务边界默认为方法开始执行时开始,方法正常结束时提交事务,方法执行过程中抛出异常时回滚事务。

    但是,你可以通过在 @Transactional 注解中设置 propagation 属性来改变事务的传播行为,例如可以设置为Propagation.MANDATORY,这意味着该方法必须在一个已经存在的事务中执行,否则就会抛出异常。

    另外,你可以通过设置 phase 属性来改变事务的提交时机,例如可以设置为Phase.AFTER_COMPLETION,这样事务就会在整个事务完成后提交,不论事务是正常结束还是异常结束。

    以下是一个使用 @Transactional 注解的示例,其中 propagation 设置为Propagation.REQUIRED(默认值),表示如果当前存在事务,则加入该事务;如果不存在,则创建一个新的事务。

    impoFHprPrt org.springframework.transaction.annotation.Transactional;
    import org.springframework.stereotype.Service;
     
    @Service
    public class MyService {
     
        @Transactional
        public void someTransactionalMethod() {
            // 方法执行的代码
        }
    }
    

    二、踩坑

    接到的需求中:每个订单状态改变时触发上传状态,为了不影响订单正常流转,增加一个消息中间件MQ

    流程图如下:

    @Transactional用法详解与事务避坑指南

    二、状态统一入口:推送

        private SendResult sendStatusToMq(SendStatusPoolMsgVo sendStatusPoolMsgVo) {
            String msg = jsON.toJSONString(sendStatusPoolMsgVo, SerializerFeature.PrettyFormat,
                    SerializerFeature.WriteMapNullValue);
            LOGGER.info("sendStatusPoolMsgToMq,发送的消息{}", msg);
            Message<String> message = MessageBuilder.withPayload(msg).build();
            SendResult sendResult = new SendResult();
            try {
                sendResult = rocketMQTemplate.syncSend(statusPoolTopic, message, 3000, 2);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("sendStatusToMq err", e);
                TaskMqModel taskMqModel = new TaskMqModel();
                taskMqModel.setTopic(statusPoolTopic);
                taskMqModel.setMessageType(MessageTypeConstant.MQ_SUPPLE_STATUSPOOL);
                taskMqModel.setDisposeFlag(RequestConstant.REQUEST_ORIGINAL);
                taskMqModel.setRawMessage(msg);
                taskMqModhttp://www.devze.comel.setCreateTime(new Date());
                taskMqService.save(taskMqModel);
            }
            return sendResult;
        }
    

    我这里做了个状态统一入口,同时为了避免推送MQ服务失败,做了个异常捕获保存推送消息,方便重推。

    三、MQ消费

    // ... 其他业务逻辑 ...
            // 获取订单信息
            String orderNo = msgVo.getOrderNo();
            orderModel orderModel = orderService.getOrderByOrderNo(orderNo);
            //如果是客户下单再去待处理订单查询
            if (Objects.isNull(fmOrderModel)) {
                LOGGER.err("未查到信息,初始化失败!");
                return;
            }
    //
    

    然后我的日志全是:

    [2024-08-8 08:36:50.161][ERROR][ConsumeMessageThread_2] 未查到信息,初始化失败!
    [2024-08-8 08:37:57.015][ERROR][ConsumeMessageThread_2] 未查到编程客栈信息,初始化失败!
    [2024-08-8 08:42:45.536][ERROR][ConsumeMessageThread_2] 未查到信息,初始化失败!
    

    我查看数据库是完全能查到的,有点奇怪❗❗❗❗❓❓❓❓

    四、问题与解决

    在Java中,@Transactional 注解通常用于声明方法执行的事务边界,确保方法内的一系列操作要么全部成功,要么全部失败。它通常与Spring框架的事务管理器一起使用,以提供声明式的事务管理。

    如果你的订单数据保存操作使用了 @Transactional,并且在保存数据之后立即推送消息到消息队列(MQ),那么消费者可能查不到数据的原因可能与事务的隔离级别有关。

    下面是我搜到的一些可能导致该问题的原因:

    1. 事务隔离级别:如果你的数据库事务使用了较高的隔离级别(如可重复读或串行化),那么在事务提交之前,其他事务(包括MQ消费者)可能看不到该事务所做的更改。
    2. 事务传播行为:如果MQ消费者也运行在Spring管理的事务中,并且事务的传播行为设置为不支持当前事务(例如,Propagation.NOT_SUPPORTED),那么消费者可能会在不同的事务上下文中运行,从而看不到未提交的更改。
    3. 延迟提交:如果事务在消息发送之后才提交,MQ消费者可能会在事务提交之前读取数据库,因此看不到新数据。
    4. MQ消息延迟:如果消息队列本身存在延迟,消费者可能会在数据还未写入数据库时就接收到消息。
    5. 数据库缓存:某些数据库实现可能会缓存数据,这可能导致即使数据已经写入,消费者也无法立即看到更新。
    6. MQ消费者处理逻辑:如果MQ消费者在处理消息时没有正确地处理事务或数据库查询,也可能导致查不到数据。

    解决方式

    在Spring框架中,我通过实现TransactionSynchronization接口来在事务提交后执行回调操作。

    以下是实现事务提交回调发送消息的方法:

    @Component
    public class TransactionalMessageSender implements TransactionSynchronization {
    
        @Autowired
        private SomeMessageService messageService; // 消息发送服务
    
        @Override
        public void beforeCommit(boolean readOnly) {
            // 这里可以执行一些操作,但事务还未提交
        }
    
      android  @Override
        public void beforeCompletion() {
            // 事务即将提交,可以在这里准备发送消息
        }
    
        @Override
        public void afterCommit() {
            // 事务已经提交,可以在这里发送消息
            sendResult = rocketMQTemplate.syncSend(statusPoolTopic, message, 3000, 2);
        }
    
        @Override
        public void afterCompletion(TransactionStatus status) {
            // 事务已经完成,无论是提交还是回滚
            if (status == TransactionStatus.COMMITTED) {
                // 这里可以执行一些事务提交后的清理工作
            }
        }
    }
    

    五、总结

    在 Spring 框架中使用 @Transactional 注解时,事务管理器会在事务边界内管理数据库操作。

    当你在一个事务中执行数据库操作后立即发送消息到消息队列(MQ),可能会遇到一个问题:MQ 消费者在接收到消息并尝试查询数据库时,发现数据库中并没有预期的数据。

    这个问题的根本原因在于事务的隔离级别和事务提交的时间点。

    当事务尚未提交时,其他事务(包括 MQ 消费者的查询操作)是无法看到该事务内的修改的。

    即使你在事务中已经执行了数据库操作(如插入或更新),这些修改对其他事务来说仍然是不可见的,直到当前事务提交。

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜