数据库事务提交后才发送MQ消息解决方案

2023-11-03 08:33:09 浏览数 (2)

项目场景:

在项目开发中常常会遇到在一个有数据库操作的方法中,发送MQ消息,如果这种情况消息队列效率比较快,就会出现数据库事务还没提交,消息队列已经执行业务,导致不一致问题。举个应用场景,我们提交一个订单,将流水号放在MQ里,MQ监听到后就会查询订单去做其它业务,如果这时候数据库事务还没提交,也就是没生成订单流水,MQ监听到消息就去执行业务,查询订单,肯定会出现业务不一致问题

问题描述

最近遇到一个业务场景,类似于下单过程,场景是用户注册消息,注册成功后,会发送MQ消息,MQ监听到消息后,会查询用户的信息,如何再做其它业务,但是遇到一个问题,就是mq消费消息的速度是快于数据库事务提交的,就是我们用户注册的信息还没写入数据库,mq已经提前消费了,所以会导致查询不到用户注册的信息

大致的代码:

代码语言:javascript复制
@Transactional(rollbackFor = Exception.class)
public void register(){
     User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
        userMapper.insert(user);
    
    // 发送消息给MQ
    sendMQMessage();
}

原因分析

MQ消息消费快于事务提交

解决方案

对于这种情况,下面给出两种处理方法,一种是借助于Spring框架提供的TransactionSynchronizationManager来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener来控制事务

  • TransactionSynchronizationManager控制事务
代码语言:javascript复制
@Transactional(rollbackFor = Exception.class)
public void register() {
    
    User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
    userMapper.insert(user);
    
    
    // after transaction commit
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // 发送消息给MQ
    		sendMQMessage();
        }
    });

}

测试一下,通过日志可以看出事务已经提交了,如何发送mq,mq监听到消息,就会去读取用户信息,是可以获取到的

  • @TransactionalEventListener控制事务

如果借助Spring框架提供的事件监听机制来实现,就需要用到@TransactionalEventListener监听器,下面给出例子

创建一个Event,主要来做参数传送

代码语言:javascript复制
package com.example.eventlistener.event;

import org.springframework.context.ApplicationEvent;


public class SendMsgEvent extends ApplicationEvent {

    private Long userId;

    private String userName;

    public SendMsgEvent(Object source){
        super(source);
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}

创建一个监听器,注意要加上@Component,组件类才能被Spring容器管理

代码语言:javascript复制
package com.example.eventlistener.listener;


import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import javax.annotation.Resource;

@Component
@Slf4j
public class SendMsgListener {

    @Resource
    private UserMapper userMapper;

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)
    public void sendMsg(SendMsgEvent sendMsgEvent) {
        log.info("sendMsg: {}" , JSONUtil.toJsonStr(sendMsgEvent));

         // 发送消息给MQ
    	sendMQMessage();
    }
}

业务类实现业务:

代码语言:javascript复制
package com.example.eventlistener.service.impl;

import cn.hutool.http.HttpRequest;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.event.UserRegisterEvent;
import com.example.eventlistener.mapper.UserMapper;
import com.example.eventlistener.model.User;
import com.example.eventlistener.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.Resource;

@Service
@Slf4j
public class UserServiceImpl implements ApplicationEventPublisherAware , IUserService {

    private ApplicationEventPublisher applicationEventPublisher;

    @Resource
    private UserMapper userMapper;


    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void sendMsgAfterRegisterByEvent() {
        User user = doRegister();

        // after transaction commit
        SendMsgEvent sendMsgEvent = new SendMsgEvent(this);
        sendMsgEvent.setUserId(user.getId());
        sendMsgEvent.setUserName(user.getName());
        applicationEventPublisher.publishEvent(sendMsgEvent);

    }
    
    private User doRegister() {
        User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
        userMapper.insert(user);
        log.info("save user info");
        return user;
    }


}

经过测试,也可以实现同样的效果,控制数据库的事务提交后,才执行发送MQ消息

补充: 如果执行出现java.lang.IllegalStateException: Transaction synchronization is not active,说明没加事务控制,加上@Transactional(rollbackFor = Exception.class)即可

0 人点赞