史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(上)

2024-01-18 22:48:02 浏览数 (1)

该篇内容作为使用消息队列中间件RocketMQ实现分布式事务的上篇,叙述分布式事务的相关原理以及如何安装部署自己的RocketMQ

前言

最近,出现了一个流行词"躺平"。

不管是大佬,还是网红,都在疯狂地讨论这个词背后的那些零零碎碎。

那么,我们在文章的开头,也说一下这个词,在我这的思考。

先说下这个词的概念,维基百科给的解释:年轻人出于对国内压抑的工作文化的失望,与其跟随社会期望坚持奋斗,不如选择“躺平”的处事态度。

不管什么时候,不管什么样的工作环境,都存在这竞争。内卷化如此严重的今天,我们该抱着怎么样的态度是生活呢?

我想躺平是一个选择,但是不免是充满了颓靡。

我想的是,你实现了你的最初的梦想了么?或许是给家人幸福,或许是买自己想买的东西,或许是见识人生中的风景。 在我看来,只要你没有实现,那么,我们就需要进一步去努力,不为别的,只为最初的梦想。人活一世,不过百年,如果不留下点什么,是不是比较遗憾。我们还是需要争取自己的未来。

或许,很多人觉得,如此内卷,怎么有未来。是的,每个人都会有这样的思考,但是,你能改变现实么?不能!你的躺平,只会在内卷的现实中,让你变成了别人的尾巴。从而,躺平,对与你来说,也做不到。

但是,又是否不要命的刺激内卷?不,我想,人要知足,但是要有小目标。这才是活的精彩的前提。

好了,回到今天的技术文章!

我们还是要进步,还是要学习知识,知识不管什么时候,都是有用的。

分布式系统架构中,少不了遇到分布式事务的实现。或许这样那样的实现,今天我们,来说一下使用消息队列中间件RocketMQ,实现分布式事务。

分布式事务定义

分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

分布式事务理论支撑

CAP

CAP定理,又被叫作布鲁尔定理。对于设计分布式系统来说(不仅仅是分布式事务)的架构师来说,CAP就是你的入门理论。以下摘自 维基百科,辅助你理解C A P。

①一致性:对于客户端的每次读操作,要么读到的是最新的数据,要么读取失败。换句话说,一致性是站在分布式系统的角度,对访问本系统的客户端的一种承诺:要么我给您返回一个错误,要么我给你返回绝对一致的最新数据,不难看出,其强调的是数据正确。

②可用性:任何客户端的请求都能得到响应数据,不会出现响应错误。换句话说,可用性是站在分布式系统的角度,对访问本系统的客户的另一种承诺:我一定会给您返回数据,不会给你返回错误,但不保证数据最新,强调的是不出错。

③分区容忍性:由于分布式系统通过网络进行通信,网络是不可靠的。当任意数量的消息丢失或延迟到达时,系统仍会继续提供服务,不会挂掉。换句话说,分区容忍性是站在分布式系统的角度,对访问本系统的客户端的再一种承诺:我会一直运行,不管我的内部出现何种数据同步问题,强调的是不挂掉。

BASE

BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。是对CAP中AP的一个扩展

基本可用:分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。 软状态:允许系统中存在中间状态,这个状态不影响系统可用性,这里指的是CAP中的不一致。 最终一致:最终一致是指经过一段时间后,所有节点数据都将会达到一致。

BASE解决了CAP中理论没有网络延迟,在BASE中用软状态和最终一致,保证了延迟后的一致性。BASE和 ACID 是相反的,它完全不同于ACID的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。

分布式事务解决方案

目前,分布式事务有很多的解决方案。相应的专门开源中间件也有,例如Seata。

今天的主人公RocketMQ事务和Seata解决的都是分布式事务问题,区别在于Seata是CAP理论,而RocketMQ方案是BASE理论 也就是最终一致性

那么,我们就完整的走一遍RocketMQ的实现分布式事务方案。

RocketMQ事务流程

Producer 即MQ发送方

1、Producer 发送事务消息 Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

Producer 发送 ”业务封装的消息“ 到MQ Server。

2、MQ Server回应消息发送成功 MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务 Producer 端执行业务代码逻辑,通过本地数据库事务控制。

Producer 执行添加用户操作。

4、消息投递 若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”业务封装的消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”业务封装的消息“ 这条消息,下游自热就无法消费了。

MQ订阅方(下游服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

5、事务回查 如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

RocketMQ安装部署

要是使用,就需要先安装对应的消息队列服务。本文主要是针对没有容器化环境的,实现快速安装部署。首先需要准备好服务器环境,然后开始后续的安装部署。

下载安装

• 下载安装 RocketMQ 5.1.4 ,截止当前为最新版

部署

上传安装至/usr/local/src 目录

执行解压、安装目录指定

代码语言:sh复制
cd /usr/local/src
unzip rocketmq-all-5.1.4-bin-release.zip
mv rocketmq-all-5.1.4-bin-release ../rocketmq-5.1.4

启动NameServer

代码语言:sh复制
cd ../rocketmq-5.1.4
nohup sh bin/mqnamesrv &

查看启动

代码语言:sh复制
tail -f ~/logs/rocketmqlogs/namesrv.log

修改Broker运行配置

代码语言:sh复制
vim bin/runbroker.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 开发环境修改参数配置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

启动Broker

代码语言:sh复制
nohup sh bin/mqbroker -n localhost:9876 &

查看启动

代码语言:sh复制
tail -f ~/logs/rocketmqlogs/broker.log 

防火墙开启端口

代码语言:sh复制
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload

如果有错误,需要手动创建映射文件目录

代码语言:sh复制
cd  /root/store
mkdir commitlog consumequeue

测试消息

消息发送

代码语言:sh复制
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

消息接收

代码语言:sh复制
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

复制代码

退出运行

关闭NameServer

代码语言:sh复制
sh bin/mqshutdown namesrv

关闭Broker

代码语言:sh复制
sh bin/mqshutdown broker

RocketMQ控制台安装

安装完RocketMQ之后,图形化界面能够帮助我们更好的管理消息队列。

下载地址

下载源码,完成编译打包

代码语言:sh复制
mvn clean package -Dmaven.test.skip=true

上传至目录/usr/local/src

创建对应的目录放置RocketMQ 控制台程序rocketmq-dashboard-1.0.1-SNAPSHOT.jar和其配置文件application.yml

代码语言:sh复制
mkdir /app/service/rocketmq-dashboard/
mkdir /app/service/rocketmq-dashboard/logs

编写启动脚本内容

代码语言:sh复制
nohup java -jar -Dspring.config.location=/app/service/rocketmq-dashboard/application.yml /app/service/rocketmq-dashboard/rocketmq-dashboard-1.0.1-SNAPSHOT.jar >/app/service/rocketmq-dashboard/logs/rocketmq-dashboard.log 2>&1 &

开放防火墙端口(根据服务器实际情况,开放相关端口

代码语言:sh复制
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --reload

测试访问

示例业务场景

本文,模拟提供两个服务应用,作为消息队列的服务提供者、服务消费者,演示功能使用

bank1 银行扣款服务

bank2 银行加款服务

场景:转账 A用户给B用户转账 A-100 B 100

bank1

  1. 提供对外API
  2. 发起扣款请求
  3. 发送消息给MQ
  4. MQ收到消息返回确认
  5. bank1执行本地扣款业务事务并提交

MQ

mq收到确认bank1提交后解锁消息允许消费

bank2

  1. 监听MQ
  2. 消费消息
  3. 执行本地加款业务

总结

本文作为实战上篇,从理论到安装部署、调试,完整的讲述了最新版本RocketMQ的使用,明天将完成本文下篇,敬请期待!!

我正在参与2024腾讯技术创作特训营第五期有奖征文,快来和我瓜分大奖!

0 人点赞