SpringBoot整合RabbitMQ实现可靠事件

2021-11-23 12:40:59 浏览数 (1)

分布式事务

我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单。在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中,商品的库存、用户的账户资金和交易记录等)的处理是分布在不同数据库中的,分布式事务就是为了解决在多个数据库节点中保证这些数据的一致性。

分布式事务里有个BASE理论,在分布式数据库中,存在强一致性和弱一致性。

强一致性的好处是,对于开发者来说比较友好,数据始终可以读取到最新值,但这种方式需要复杂的协议,并且需要牺牲很多的性能。

弱一致性,对于开发者来说相对没有那么友好,无法保证读取的值是最新的,但是不需要引入复杂的协议,也不需要牺牲很多的性能。

弱一致性是当今企业采用的主流方案,它并不能保证所有数据的实时一致性,所以有时候实时读取数据是不可信的。它只是在正常的流程中,加入了提供修复数据的可能性,从而减少数据不一致的可能性,大大降低数据不一致的可能性。

什么时候使用分布式事务

对于像电商中用户隐私信息、商品信息、交易记录以及资金等数据,这些具备价值的核心数据,关系到用户隐私和财产的内容,应该考虑使用分布式事务来保证一致性。

但对于用户评价、自身装饰和其他一些非重要的个性化信息,可以采用非事务的处理。因为一个正常的系统出现不一致的情况是小概率事件,而非大概率事件,对于一些小概率的数据丢失,一般来说是允许的。之所以这样选择,主要基于两点,一个是开发者的开发难度;另一个是用户的体验,过多的分布式事务会造成性能的不断丢失

弱一致性分布式事务解决方案有如下几种:

  • 状态表
  • RabbitMQ可靠事件
  • 最大尝试
  • TCC模式

幂等性

在分布式事务中,各个访问操作的接口,都需要保证幂等性。

所谓幂等性,是指在HTTP协议中,一次和多次请求某一个资源,对于资源本身应该具有同样的结果,也就是其执行任意多次时,对资源本身所产生的影响,与执行一次时的相同。

实现方式有以下几种:

  • 唯一索引 -- 防止新增脏数据
  • token机制 -- 防止页面重复提交
  • 悲观锁 -- 获取数据的时候加锁(锁表或锁行)
  • 乐观锁 -- 基于版本号version实现, 在更新数据那一刻校验数据
  • 分布式锁 -- redis(jedis、redisson)或zookeeper实现
  • 状态机 -- 状态变更, 更新数据时判断状态undefined※说明:如何实现接口的幂等性,可以分篇在接口的幂等性文章里解说。

状态表实现分布式事务

这里拿电商的商品交易为例,讲述下思路:

  1. 需要商品数据库:商品表、商品交易明细表;资金数据库:用户账户表、账户交易明细表
  2. 主要流程包括:undefined商品表减商品库存、undefined商品交易明细表中添加新的交易记录、undefined用户账户表中扣减用户账户表的资金、undefined资金交易明细表中记录账户交易明细表
  3. 需要准备一个状态表,用redis的Hset数据类型比较合适
  4. 这里假设相关的明细记录表中,有4个状态:undefined1--准备交易,undefined2--交易成功,undefined3--被冲正,undefined4--冲正记录

交易流程

流程说明

在商品服务中,商品减库存后,记录商品交易明细,如果没有异常,就将商品交易记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

商品服务通过RESTFUL调用资金服务,如果成功,就将账户交易明细表的记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

最后,读取Redis相关的所有状态位,确定是否所有的操作都为“1—准备提交”状态,如果是,则更新产品服务的记录状态为“2—提交成功”,然后发起资金服务调用,将对应的记录(可通过业务流水号关联)的状态也更新为“2—提交成功”,这样就完成了整个交易。

如果不全部为“1—准备提交”状态,则发起各库的冲正交易,冲掉原有的记录,并且归还商品库存和账户金额。发起冲正交易,把原明细记录状态更新为3--

被冲正,并往明细表中添加对应的新记录,状态为4--冲正记录

RabbitMQ可靠事件

使用RabbitMQ等消息队列中间件的可靠事件,来实现分布式事务,这里结合SpringBoot

前面有介绍过SpringBoot整合多数据库的文章,这里可以用到,具体参考《[Spring

Boot学习:MyBatis配置Druid多数据源](https://www.jianshu.com/p/00a2ee8747fd)》,切换数据源使用@DataSource注解,如下

代码语言:txt复制
@DataSource(value = DataSourceType.MASTER) //切换到商品数据库
代码语言:txt复制
@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

在此基础上我们加入RabbitMQ实现分布式事务功能

  1. 在pom.xml文件中加入依赖
代码语言:txt复制
        <dependency>
代码语言:txt复制
            <groupId>org.springframework.boot</groupId>
代码语言:txt复制
            <artifactId>spring-boot-starter-amqp</artifactId>
代码语言:txt复制
        </dependency>
  1. yml配置文件中,关于RabbitMQ的配置如下:
代码语言:txt复制
# Spring 配置
代码语言:txt复制
spring:
代码语言:txt复制
  rabbitmq:
代码语言:txt复制
    host: localhost
代码语言:txt复制
    port: 5672
代码语言:txt复制
    username: admin
代码语言:txt复制
    password: 123456
代码语言:txt复制
    #使用发布者确认模式,发布消息者会得到一个“消息是否被服务提供者接收”的确认消息
代码语言:txt复制
    publisher-confirms: true
代码语言:txt复制
#RabbitMQ 队列名称配置
代码语言:txt复制
rabbitmq:
代码语言:txt复制
  queue:
代码语言:txt复制
    fund: fund

3.创建RabbitMQ配置文件RabbitConfig.java

代码语言:txt复制
package com.zhlab.demo.config;
代码语言:txt复制
import org.springframework.amqp.core.Queue;
代码语言:txt复制
import org.springframework.beans.factory.annotation.Value;
代码语言:txt复制
import org.springframework.context.annotation.Bean;
代码语言:txt复制
import org.springframework.context.annotation.Configuration;
代码语言:txt复制
/**
代码语言:txt复制
 * @ClassName RabbitConfig
 * @Description //RabbitMQ消息队列配置
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:10
 **/
@Configuration
public class RabbitConfig {
代码语言:txt复制
    // 读取配置属性
代码语言:txt复制
    @Value("${rabbitmq.queue.fund}")
代码语言:txt复制
    private String fundQueueName = null;
代码语言:txt复制
    // 创建RabbitMQ消息队列
代码语言:txt复制
    @Bean(name="fundQueue")
代码语言:txt复制
    public Queue createFundQueue() {
代码语言:txt复制
        return new Queue(fundQueueName);
代码语言:txt复制
    }
代码语言:txt复制
}
  1. 创建数据传输对象FundParams.java
代码语言:txt复制
package com.zhlab.demo.model;
代码语言:txt复制
import java.io.Serializable;
代码语言:txt复制
/**
代码语言:txt复制
 * @ClassName FundParams
 * @Description //FundParams
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:30
 **/
public class FundParams implements Serializable {
    // 序列化版本号
    public static final long serialVersionUID = 989878441231256478L;
    private Long xid; // 业务流水号
    private Long userId; // 用户编号
    private Double amount; // 交易金额
代码语言:txt复制
    public FundParams() {
代码语言:txt复制
    }
代码语言:txt复制
    public FundParams(Long xid, Long userId, Double amount) {
代码语言:txt复制
        this.xid = xid;
代码语言:txt复制
        this.userId = userId;
代码语言:txt复制
        this.amount = amount;
代码语言:txt复制
    }
代码语言:txt复制
    public Long getXid() {
代码语言:txt复制
        return xid;
代码语言:txt复制
    }
代码语言:txt复制
    public void setXid(Long xid) {
代码语言:txt复制
        this.xid = xid;
代码语言:txt复制
    }
代码语言:txt复制
    public Long getUserId() {
代码语言:txt复制
        return userId;
代码语言:txt复制
    }
代码语言:txt复制
    public void setUserId(Long userId) {
代码语言:txt复制
        this.userId = userId;
代码语言:txt复制
    }
代码语言:txt复制
    public Double getAmount() {
代码语言:txt复制
        return amount;
代码语言:txt复制
    }
代码语言:txt复制
    public void setAmount(Double amount) {
代码语言:txt复制
        this.amount = amount;
代码语言:txt复制
    }
代码语言:txt复制
}
  1. 创建商品服务 业务逻辑PurchaseService.java
代码语言:txt复制
package com.zhlab.demo.service.goods;
代码语言:txt复制
import com.zhlab.demo.db.DataSourceType;
代码语言:txt复制
import com.zhlab.demo.db.annotation.DataSource;
代码语言:txt复制
import com.zhlab.demo.model.FundParams;
代码语言:txt复制
import com.zhlab.demo.utils.SnowFlakeUtil;
代码语言:txt复制
import org.springframework.amqp.rabbit.connection.CorrelationData;
代码语言:txt复制
import org.springframework.amqp.rabbit.core.RabbitTemplate;
代码语言:txt复制
import org.springframework.beans.factory.annotation.Autowired;
代码语言:txt复制
import org.springframework.beans.factory.annotation.Value;
代码语言:txt复制
import org.springframework.stereotype.Service;
代码语言:txt复制
/**
代码语言:txt复制
 * @ClassName PurchaseService
 * @Description //商品 业务逻辑
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:24
 **/
@Service
public class PurchaseService implements RabbitTemplate.ConfirmCallback {
    //实现RabbitTemplate.ConfirmCallback接口
    //需要实现它定义的confirm方法,这样它便可以作为一个发布者检测消息是否被消费者所接收的确认类
代码语言:txt复制
    // SnowFlake算法生成ID
代码语言:txt复制
    SnowFlakeUtil worker = new SnowFlakeUtil(003);
代码语言:txt复制
    // RabbitMQ模板
代码语言:txt复制
    @Autowired
代码语言:txt复制
    private RabbitTemplate rabbitTemplate;
代码语言:txt复制
    // 读取配置属性
代码语言:txt复制
    @Value("${rabbitmq.queue.fund}")
代码语言:txt复制
    private String fundQueueName;
代码语言:txt复制
    // 购买业务方法
代码语言:txt复制
    @DataSource(value = DataSourceType.MASTER) //切换到商品数据库
代码语言:txt复制
    public Long purchase(Long productId, Long userId, Double amount) {
代码语言:txt复制
        rabbitTemplate.setConfirmCallback(this);//设置了回调类为当前类
代码语言:txt复制
        // SnowFlake算法生成序列号,用户跨服务的关联,这里用本地自定义方法,可以借助Leaf TinyID等分布式ID生成服务中间件
代码语言:txt复制
        Long xid = worker.nextId();
代码语言:txt复制
        // 传递给消费者的参数
代码语言:txt复制
        FundParams params = new FundParams(xid, userId, amount);
代码语言:txt复制
        // 发送消息给资金服务做扣款
代码语言:txt复制
        this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④
代码语言:txt复制
        System.out.println("执行产品服务逻辑");
代码语言:txt复制
        return xid;
代码语言:txt复制
    }
代码语言:txt复制
    /**
代码语言:txt复制
     * 确认回调,会异步执行
     * @param correlationData --相关数据
     * @param ack -- 是否被消费
     * @param cause -- 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        /*
         * ack代表是否成功。
         * 如果投递消息失败,就会先停滞1秒,然后尝试进行冲正交易,冲掉原有交易,这样就可以使得数据平整
         */
        if (ack){ // 消息投递成功
            System.out.println("执行交易成功");
        } else { // 消息投递失败
            try {
                // 停滞1秒(稍微等待可能没有完成的正常流程),然后发起冲正交易
                Thread.sleep(1000);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            System.out.println("尝试产品减库存冲正交易。");
            System.out.println("尝试账户扣减冲正交易。");
            //在confirm方法中,如果参数ack为false,则说明消息传递失败,就要尝试执行冲正交易,把数据还原回来
            System.out.println(cause); // 打印消息投递失败的原因
        }
    }
}
  1. 创建账户服务业务逻辑AccountService.java
代码语言:txt复制
package com.zhlab.demo.service.fund;
代码语言:txt复制
import com.zhlab.demo.db.DataSourceType;
代码语言:txt复制
import com.zhlab.demo.db.annotation.DataSource;
代码语言:txt复制
import com.zhlab.demo.model.FundParams;
代码语言:txt复制
import org.springframework.amqp.rabbit.annotation.RabbitListener;
代码语言:txt复制
import org.springframework.stereotype.Service;
代码语言:txt复制
/**
代码语言:txt复制
 * @ClassName AccountService
 * @Description //账户 业务逻辑
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:25
 **/
@Service
public class AccountService {
代码语言:txt复制
    /* 消息监听,取YAML文件配置的队列名
代码语言:txt复制
     *因为消息被消费,所以触发PurchaseService类的confirm方法
代码语言:txt复制
     *spring.rabbitmq.listener.simple.acknowledge-mode = manual
代码语言:txt复制
     *如果配置为手动,这里就需要手动确认消息,默认为自动的
代码语言:txt复制
     *自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。
代码语言:txt复制
     *这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失
代码语言:txt复制
     *手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),
代码语言:txt复制
     *RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。
代码语言:txt复制
     *这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况
代码语言:txt复制
     *主要方法:basicAck、basicNack、basicReject根据具体业务情况使用,配合redis做幂等检验
代码语言:txt复制
    */
代码语言:txt复制
    @RabbitListener(queues = "${rabbitmq.queue.fund}")
代码语言:txt复制
    @DataSource(value = DataSourceType.SLAVE) //切换到账户数据库
代码语言:txt复制
    public void dealAccount(FundParams params) {
代码语言:txt复制
        //TODO具体业务逻辑需自己实现
代码语言:txt复制
        System.out.println("扣减账户金额逻辑......");
代码语言:txt复制
    }
代码语言:txt复制
}

7.写个测试接口来测试一下,创建MqController.java

代码语言:txt复制
package com.zhlab.demo.controller;
代码语言:txt复制
import com.zhlab.demo.service.goods.PurchaseService;
代码语言:txt复制
import org.springframework.beans.factory.annotation.Autowired;
代码语言:txt复制
import org.springframework.web.bind.annotation.GetMapping;
代码语言:txt复制
import org.springframework.web.bind.annotation.RequestMapping;
代码语言:txt复制
import org.springframework.web.bind.annotation.RestController;
代码语言:txt复制
/**
代码语言:txt复制
 * @ClassName MqController
 * @Description //RabbitMQ可靠消息 接口测试
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 下午 2:25
 **/
@RestController
@RequestMapping("/mq")
public class MqController {
    @Autowired
    private PurchaseService purchaseService;
代码语言:txt复制
    @GetMapping("/test")
代码语言:txt复制
    public String testMq() {
代码语言:txt复制
        return purchaseService.purchase(1L, 1L, 200.0)   "";
代码语言:txt复制
    }
代码语言:txt复制
}

以上就是基于RabbitMQ可靠消息 实现的分布式事务处理,逻辑和说明都在注释里了。

※说明:这样的确认方式,只是保证了事件的有效传递,但是不能保证消费类能够没有异常或者错误发生,当消费类有异常或错误发生时,数据依旧会存在不一致的情况。这样的方式,只是保证了消息传递的有效性,降低了不一致的可能性,从而大大降低了后续需要运维和业务人员处理的不一致数据的数量

TCC补偿事务

TCC代表的是

  • try(尝试)
  • confirm(确认)
  • cancel(取消)

在TCC事务中,要求任何一个服务逻辑都有3个接口,它们对应的就是尝试(try)方法、确认(confirm)方法和取消(cancel)方法。

TCC事务模型

TCC事务的一致性可达99.99%,是一种较为成熟的方案,因此在目前有着较为广泛的应用。

继续通过上面的商品交易流程来解析这个模型:

  1. 一阶段undefined商品表减库存,商品交易明细表记录商品交易明细,并且将对应记录状态设置为“1—准备提交”。undefined调用账户服务,用户账户表扣减账户资金,账户交易明细表记录交易明细,并且将对应记录状态设置为“1—准备提交”undefined在一阶段的调用中,如果没有发生异常,就可以执行正常二阶段进行提交了
  2. 正常二阶段undefined商品服务 更新对应记录的状态为“2—提交成功”,使得数据生效undefined调用账户服务,使得对应的记录状态也为“2—提交成功”,这样正常的提交就完成了undefined如果在一阶段发生异常,需要取消操作,可以执行异常二阶段
  3. 异常二阶段undefined商品服务执行冲正交易,冲掉原有的产品交易,将库存归还给商品表undefined调用账户服务,发起冲正交易,冲掉原有的资金交易,将资金归还到账户里

注意,这些提交和退出机制在TCC中,都需要开发者对接口作幂等性处理

TCC事务机制,也并不能保证所有的数据都是完全一致的,它只是提供了一个可以修复的机制,来降低不一致的情况,从而大大降低后续维护数据的代价。TCC事务也会带来两个较大的麻烦:第一个是,原本的一个方法实现,现在需要拆分为3个方法,代价较大;第二个是,需要开发者自已实现提交和取消方法的幂等性

总结

使用分布式事务,并不是很容易的事情,甚至有些方法还相当复杂

在互联网中,并不是所有的数据都需要使用分布式事务,所以首先要考虑的是:在什么时候使用分布式事务。即使需要使用分布式事务,有时候也并非需要实时实现数据的一致性,因为可以在后续通过一定的手段来完成。例如电商网站,对买家来说,需要的是快速响应,但对商家来说,就未必需要得到实时数据了,过段时间得到数据也是可以的,而这段时间就可以考虑进行数据补偿了。无论我们如何使用分布式事务,也无法使数据完全达到百分之百的一致性,

因此一般金融和电商企业会通过对账等形式来完成最终一致性的操作。

在分布式事务的选择中,都会采用弱一致性代替强一致性,相对来说,弱一致性更加灵活,更方便我们开发。从网站的角度来说,弱一致性可以获得更佳的性能,提升用户的体验,这是互联网应用需要首先考虑的要素。

拓展---电商中的高并发和分布式事务

电商网站中高并发是常见的,高并发是针对用户而言的,比如抢购中,用户只希望短时间内快速抢到商品,而商家对于交易信息可以延迟处理得到。

这就是意味着,对于用户交易部分,要尽可能通过分布式事务进行保证,但而对于商户数据部分,实时性要求相对不是那么高,可以过段时间通过后续手段来补偿修复,从而缩小分布式事务的范围。

确定需要分布式事务的范围

这里可以看出使用分布式事务的主要是请求数据,保证这个过程可以提高数据可靠性。对于商户数据,不需要使用分布式事务,这样可以提升性能,使抢购进行得更快,满足买家的需求,但是这也会引发数据的丢失。

为了解决这个问题,后续可以通过和请求数据进行对比来修复数据,使数据达到一致

,这个过程可以在高并发过后(一般高并发都是时间段性的,如性价比高的产品发布点、购物节开始时间段)进行,这样商户最终也可以得到可靠的数据,只是不是实时的,但是这并不影响商户和用户的业务。

0 人点赞