Spring Cloud Alibaba
- Seata 分布式事务框架
- Seata 分布式事务原理
- Seata Server 安装
- Seata 案例代码
- 测试框架搭建
- 测试分布式事务
- Dubbo Spring Cloud
- 是什么
- 功能完成度
- 框架的搭建
- 测试
- RocketMQ
- 下载 RocketMQ
- SpringCloud Stream
- 介绍
- 测试Demo环境搭建
- 添加事务监听
- Consumer项目的完善
承接上面的Nacos,Sentinel的学习, 现在开始学习Seata, Dubbo和RocketMQ
Seata 分布式事务框架
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata 官网: http://seata.io/zh-cn/index.html
Seata 分布式事务原理
整体机制 (两阶段提交协议的演变)
- 一阶段: 业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段: 提交异步化,非常快速地完成。 回滚通过一阶段的回滚日志进行反向补偿。
官方文档: http://seata.io/zh-cn/docs/overview/what-is-seata.html
Seata Server 安装
代码语言:javascript复制下载地址:http://seata.io/zh-cn/blog/download.html
启动 Seata Server 进入 bin 目录中, 在 window 下启动 seata-server.bat, 在 linux 下启动 seata-server.bat
Seata 案例代码
测试框架搭建
项目架构
当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
四个微服务项目负责的业务
- business-service 下单服务,
- 调用 storage-service 删减库存,
- 调用 order-service 创建订单,
- 调用 account-service 扣除账户余额
项目搭建流程
- 如上图所示, 在根项目spring-cloud-alibaba-examples下创建父项目seata-examples, 修pom文件添加依赖
<dependencies>
<!--服务注册-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
</dependency>
<!--Seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--web 项目的基础依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
- 创建子模块项目account-service,修改pom文件并添加依赖
- 创建子模块项目order-service,修改pom文件并添加依赖
- 创建子模块项目business-service,修改pom文件并添加依赖
- 创建子模块项目storage-service(对库存的扣减), 修改pom文件并添加依赖 以该微服务创建流程为例, 演示项目搭建步骤 <dependencies> <!--服务注册--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency> <!--Seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> <!--web 项目的基础依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
- 导入数据库脚本到mysql
- 代码生成 a.在mysql中右侧的database中添加数据库连接, 输入数据库相关属性, 然后根据提示下载驱动
b.连接成功后便可在idea上面查看数据库的相关信息
c.安装 MyBatis插件(自动生成实体类以及mapper接口和xml映射文件)
d. 右击选择的表进行生成代码操作
e. 填写代码生成所在位置的信息
f.编写库存操作的接口和实现类
代码语言:javascript复制public interface StorageService {
/**
* 完成对商品库存扣减操作
* @param productNo
* @param count
*/
void deduct(String productNo, int count) ;
}
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageTblDao storageTblDao;
private static Logger logger = LoggerFactory.getLogger(StorageServiceImpl.class);
@Transactional
@Override
public void deduct(String productNo, int count) {
logger.info("开始扣减商品{}的库存, 数量为{}", productNo, count);
//1.查询库存
//StorageTbl storageTbl = storageTblDao.selectByPrimaryKey(Integer.parseInt(productNo));
StorageTblExample storageTblExample = new StorageTblExample();
storageTblExample.createCriteria().andCommodityCodeEqualTo(productNo);
List<StorageTbl> storageTbls = storageTblDao.selectByExample(storageTblExample);
StorageTbl storageTbl = storageTbls.get(0);
if (storageTbl==null){
throw new IllegalArgumentException("商品不存在");
}
//2.扣减操作(扣减后的金额)
int idleCount =storageTbl.getCount()-count;
if (idleCount<0){
throw new RuntimeException("存库不足!");
}
//3.设置商品库存
storageTbl.setCount(idleCount);
//4.保存到数据库中
storageTblDao.updateByPrimaryKeySelective(storageTbl);
logger.info("扣减库存商品{}成功, 剩余的库存为{}",productNo, idleCount);
}
}
g.启动类
代码语言:javascript复制@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("ah.szxy.mapper")
@RestController
public class StorageServiceApplication {
public static void main(String[] args) {
SpringApplication.run(StorageServiceApplication.class,args);
}
@Autowired
private StorageService storageService;
//使用外部接口暴露
@GetMapping("/deduct/{productNo}/{count}")
public ResponseEntity<Void> deduct(@PathVariable("productNo") String productNo, @PathVariable("count") Integer count){
storageService.deduct(productNo, count);
return ResponseEntity.ok().build();
}
}
h.pom文件
代码语言:javascript复制server:
port: 8093
spring:
application:
name: storage-service
cloud:
nacos:
discovery:
server-addr: 47.97.169.52:8848
alibaba:
seata:
tx-service-group: ${spring.application.name}
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
name: storageDataSource
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://119.45.189.14:3306/seata?useSSL=false&serverTimezone=UTC
username: root
password: root123
druid:
max-active: 20 # 最大连接数
min-idle: 2 # 最小活跃数
initial-size: 2 # 初始连接数
seata:
service:
vgroup-mapping:
storage-service: default
grouplist:
default: 127.0.0.1:8091 # seata默认端口
disable-global-transaction: false # 开启全局事务
enabled: true # 开启 seata
mybatis-plus:
mapper-locations: classpath:/mapper/*.xml
i.启动并访问测试即可
当前为止代码地址:https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
测试分布式事务
- 如图所示, 启动所有项目, 待所有项目启动完毕后测试分布式事务
- 初始化三个表的数据 账户表
订单表
库存表
- 访问下单接口 注意在这里如果访问 SXT_USER_2用户就会报错, 因此我们可以分别测试下面两个url, 看看出现错误时是否回滚
http://localhost:8096/purchase/SXT_USER_2/HUAWEI_0001/1 //经测试,发生;额回滚
http://localhost:8096/purchase/SXT_USER_1/HUAWEI_0001/1 //未发生回滚
Dubbo Spring Cloud
是什么
Dubbo Spring Cloud 基于 Dubbo Spring Boot 2.7.1 和 Spring Cloud 2.x 开发,无论开发人 员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。 Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。 Dubbo Spring Cloud 首个 Preview Release,随同 Spring Cloud Alibaba 0.2.2.RELEASE 和 0.9.0.RELEASE 一同发布,分别对应 Spring Cloud Finchley 与 Greenwich(下文分别简称为 “F” 版 和 “G” 版)
功能完成度
由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可 认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟 的实现
框架的搭建
我们将搭建如图所示的项目框架
搭建的代码已分享至码云: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
测试
- 测试消费者项目能不能消费到提供者的项目, 访问 http://localhost:8080/rpc/csdn-timepause
- 启动多个服务提供者项目, 方式如下
修改启动类的名称, 添加指定端口的参数
启动这三个提供者项目, 并重启消费者项目
Nacos上面可以看到该项目的详细信息
- 重复访问步骤1的url, 可以看到消费者在随机的访问提供者 由此可以知道Dubbo SpringCloud对服务进行了负载均衡(自动), 且无需任何配置
RocketMQ
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、 高可靠的消息发布与订阅服务。 同时,广泛应用于多个领域,包括异步通信解耦、企业解决 方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
下载 RocketMQ
代码语言:javascript复制//这里我们选择 4.4.0 版本的原因在于,我们 spring cloud alibaba 版本为:2.2.0.RELEASE,它里面控制的 rocketMQ 的版是 4.4.0。
下载地址: http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
- 配置环境变量
变量名: ROCKETMQ_HOME
变量值: D:SoftWarerocket-mqrocketmq-all-4.4.0-bin-release //(RocketMq软件所在目录)
- 启动mqnamesrv.cmd 方法: win R 输入 cmd, 将mqnamesrv.cmd文件拖到cmd命令行中回车即可(退出只需将该命令行关闭即可)
- 启动mqbroker.cmd 方法: win R 输入 cmd, 将mqbroker.cmd文件拖到cmd命令行中, 然后输入 -n 主机名:端口号,然后回车即可(退出同上)
- 启动图形化界面 方法: win R 输入 cmd, 首先输入java -jar , 然后将rocketmq-console-ng-1.0.0.jar文件拖到cmd命令行中, 回车即可(RocketMQ和图形化软件会分享在底部)
- 访问图形化界面
输入
http://localhost:8080/
,页面右上角可以进行中英文切换
SpringCloud Stream
介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe(发布订阅)、 consumer groups(消费者组)、partition(分区) 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding:
- Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。 举例说明: Kafka的实现KafkaMessageChannelBinder, RabbitMQ 的实现RabbitMessageChannelBinder 以及 RocketMQ 的实现RocketMQMessageChannelBinder
- Binding: 包括 Input Binding 和 Output Binding。 Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
测试Demo环境搭建
按照下面的结构图, 创建pom类型的父项目, 创建两个子模块项目, 添加对应的Maven依赖, 修改配置文件, 编写测试代码, 最后启动项目 项目地址: https://gitee.com/TimePause/spring-cloud-alibaba-examples.git
配置文件
代码语言:javascript复制logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
#rocketmq 服务器 namerserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
#stream->bindings->output(input)
#output1
#发送消息的目的地址
spring.cloud.stream.bindings.output1.destination=test-topic
#消息的默认类型
spring.cloud.stream.bindings.output1.content-type=application/json
#生产者组
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
#消息的同步发送
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
#output2 主要演示事务消息的发送
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
#发送的是事务消息
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
#output3 用它演示消息的手动拉取
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
spring.application.name=rocketmq-produce-example
server.port=28081
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
- 消息提供者项目启动完毕后, 可以测试 测试普通字符串消息
测试带tag的消息
测试发送对象消息
测试发送事务消息(half)=>需要创建事务消息监听后才能发送成功(稍后演示)
测试发送消息到pull 的目的地址,为了演示我们消息的手动拉取
添加事务监听
代码语言:javascript复制package ah.szxy.listener;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@RocketMQTransactionListener(
txProducerGroup = "myTxProducerGroup" ,
corePoolSize = 2 ,
maximumPoolSize = 5
)
public class RockerMQLocalTransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 当我们发送半(half)消息成功后,mq 服务要求我们执行本地的事务,并且返回本地事务的执行结果
* RocketMQLocalTransactionState:
* COMMIT 提交->其他的消费者将收到该消息
* ROLLBACK 回滚->mq ->半消息删除
* UNKNOWN -> mq 会再次检查本地的事务->checkLocalTransaction
*
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String type = message.getHeaders().get("type").toString();
switch (type){
case "1" :
System.out.println("本地事务执行状态未知");
return RocketMQLocalTransactionState.UNKNOWN ;
case "2":
System.out.println("本地事务执行状态成功");
return RocketMQLocalTransactionState.COMMIT ;
case "3":
System.out.println("本地事务执行状态失败");
return RocketMQLocalTransactionState.ROLLBACK ;
}
return null ;
}
/**
* 当mq 收到我们的本地的事务为UNKNOWN ,它会再次来检查我们的本地事务状态,要求返回一个本地事务的状态
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("来检查了,本次我提交本地事务");
return RocketMQLocalTransactionState.COMMIT;
}
}
- 运行测试 测试提交成功的情况
测试回滚的情况
测试再次检查本地的事务的情况
Consumer项目的完善
点击查看项目地址
配置文件application.properties
代码语言:javascript复制#rocketmq nameserver的地址
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
#stream->bindings->input
#input1
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
#input2
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
#input3
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
#input4
spring.cloud.stream.bindings.input4.destination=TransactionTopic
spring.cloud.stream.bindings.input4.content-type=text/plain
spring.cloud.stream.bindings.input4.group=transaction-group
spring.cloud.stream.bindings.input4.consumer.concurrency=5
#input5 手动消息的拉取
spring.cloud.stream.bindings.input5.destination=pull-topic
spring.cloud.stream.bindings.input5.content-type=text/plain
spring.cloud.stream.bindings.input5.group=pull-topic-group
spring.application.name=rocketmq-consume-example
server.port=28082
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
测试Consumer项目
- 测试input1(接收字符类型)
2. 测试input2(接收tag类型)
3.测试input3(接收对象类型)
4.测试input4(接收事务类型, 成功/失败/确认成功还是失败)
相关软件分享如下
链接:https://pan.baidu.com/s/1o0WausCDJ6PA4OIpaz8qYA 提取码:d7wr