基于Spring的RabbitTemplate实现消息事务

基于Spring的RabbitTemplate实现消息事务

分布式系统中常见一种情况,就是数据库操作成功之后发送MQ消息。

分布式消息常见问题

数据库操作之后发送MQ消息通常会遇到一些问题,理论上消息和事务要同时成功才算一个完整的事务,那到底该把发送MQ放到数据库事务之外还是数据库事务之内?

下面分析下可能存在的问题:

  1. 消息放到数据库事务之内
    1. 事务处理异常,回滚事务——ok
    2. 消息发送异常,回滚事务——ok
    3. 消息发送成功提交事务——ok
    4. 消息发送成功提交失败——不ok,不好处理,一般MQ也不能撤销消息,而且消费端可能已经在处理了
  2. 消息放到数据库事务之外
    1. 事务处理异常,回滚事务——ok,不用发消息
    2. 事务处理成功,发送消息成功——ok
    3. 事务处理成功,消息发送失败——不ok,消息丢失
  3. 在2的基础上增加本地消息表,放到同一个数据库,业务操作完成之后把需要发送的MQ消息插入本地消息表中
    1. 事务处理异常,回滚事务——ok,不用发消息,消息表也回滚
    2. 事务处理成功,发送消息成功
      1. 更新消息表状态成功——ok
      2. 更新消息表状态失败——ok(定时任务补偿)
    3. 事务处理成功,消息发送失败——ok(定时任务补偿)
    4. 通过定时扫描失败消息重新发送MQ
      1. 重发消息需保证幂等性——ok

分布式消息事务处理

常见的处理逻辑是本地消息表+消息重试补偿

基于Spring的RabbitTemplate实现消息事务

RabbitTemplate配置和使用

我们使用RabbitMQ作为消息队列,因此我们可以使用spring-rabbit帮助实现mq发送(前提是已经安装了RabbitMQ了)。

      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-amqp.version}</version>
      </dependency>

application.yml配置:

spring:
  rabbitmq:
    virtual-host: /
    username: appws
    password: xxxxxx
    addresses: 10.181.57.239:5672
    publisherConfirms: true

然后就可以注入RabbitTemplate了,代码片段如下:

// 消息队列配置
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_ROUTEKEY = "test.routekey";
@Bean
public Exchange testExchange() {
    return new TopicExchange(TEST_EXCHANGE);
}
@Bean
public Queue testQueue() {
    return new Queue(TEST_QUEUE, true);
}
@Bean
public Binding testBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(TEST_ROUTEKEY).noargs();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void test(){
    // 一些DB操作
    rabbitTemplate.convertAndSend(TEST_EXCHANGE, TEST_ROUTEKEY, param);
    // 其他DB操作等
}

拦截器处理RabbitTemplate事务

​ 从上面的代码片段可以看出,业务方法使用了@Transactional注解使用了事务之后,rabbitTemplate.convertAndSend方法并没有专门放到事务之外,这个时候一旦有异常,可能造成消息发送成功,但是事务异常回滚的问题。要解决这个问题,需要把rabbitTemplate.convertAndSend移动到事务之外,但是通常都配置的声明式事务,不能简单的把代码移动到外面,这个需要利用Spring事务的一个特性TransactionSynchronization,注册一个同步钩子,自动把相关代码放到事务完成之后执行,我们使用拦截器拦截rabbitTemplate.convertAndSend方法,实现不用修改现有代码自动把发送MQ消息逻辑移到事务之外:

RabbitTemplateTransactionInterceptor.java代码详情:

@Aspect
@Order(50)
@Component
public class RabbitTemplateTransactionInterceptor {
    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(RabbitTemplateTransactionInterceptor.class);
    /**
     * 代理convertAndSend方法够用
     */
    @Pointcut("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object))")
    public void convertAndSend() {
        // noop
    }
    @Around("convertAndSend()")
    public void aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        if (TransactionSynchronizationManager.isSynchronizationActive()
                && TransactionSynchronizationManager.isActualTransactionActive() // 事务开启判断
                && args.length == 3) {
            logger.info("拦截RabbitTemplate发送:{}", args);
            // 注册同步器
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() { // 事务提交之后执行
                    try {
                        joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throw new RuntimeException(throwable);
                    }
                }
            });
        } else { // 没有开启事务或者参数不正确就直接执行,不处理
            joinPoint.proceed();
        }
    }
}

注:定时扫描和消息重试在另外的逻辑中。

给TA打赏
共{{data.count}}人
人已打赏
运维

Picgo图床从Github转到Gitee

2024-11-19 10:36:29

运维

从Java8升级到Java11

2024-11-19 10:36:37

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索