微服务[学成在线] day19:分布式事务

2020-08-05 16:57:32 浏览数 (1)

? 知识点概览

为了方便后续回顾该项目时能够清晰的知道本章节讲了哪些内容,并且能够从该章节的笔记中得到一些帮助,所以在完成本章节的学习后在此对本章节所涉及到的知识点进行总结概述。

本章节为【学成在线】项目的 day19 的内容

  •  分析了分布式事务的使用场景、以及 2PCTCCMQ 等解决方案
  •  通过用户下单选课的案例来讲解了基于 Spring Task 以及 RabbitMQ 来实现 MQ 在分布式事务的整个流程。
  •  对 Mysql 本地事务有了更深刻的了解,例如在同一个事务当中,操作A操作B 需要同时都操作成功,数据才能真正的写入到数据库内,这样也就保证了数据的幂等性。一、订单与选课的需求分析

0x01 订单支付流程

学成在线的课程分为免费和收费两种。对于收费课程,用户需提交订单并完成支付方可在线学习。

提交订单及支付流程如下:

系统处理流程:

1、用户提交订单需要先登录系统

2、提交订单,订单信息保存到订单数据库

3、订单支付,调用微信支付接口完成支付

4、完成支付,微信支付系统通知学成在线支付结果

5、学成在线接收到支付结果通知,更新支付结果

用户操作流程

1、用户进入课程详情页面

2、点击“立即购买”,打开订单确认信息

3、点击“确认无误,提交订单”

订单提交成功,向订单数据库的 xc_orders 订单表保存一条记录,向 xc_orders_detail订单明细表保存一条或多条记录,向订单支付表插入一条记录。

4、订单提交成功自动进入订单支付页面

5、点击“微信支付”打开二维码

6、用手机扫码支付,支付完成,点击 “完成支付”

支付完成,收到微信支付系统的支付完成通知或请求微信查询支付已完成,更新学成在线订单支付表中的支付状态字段。

0x02 自动选课需求

支付成功即完成订单,订单完成之后系统需自动添加选课。

下图是微信支付、学成在线订单服务、学成在线学习服务交互图:

1、用户支付完成,微信支付系统会主动通知学成在线支付结果,学成在线也可主动请求微信支付查询订单的支付结果。最终得到支付结果后将订单支付结果保存到订单数据库中。

2、订单支付完成系统自动向选课表添加学生选课记录。

3、选课记录添加完成学习即可在线开始学习。

二、分布式事务

0x01 问题描述

根据上边的自动选课的需求,分析如下:

用户支付完成后,会将支付状态及订单状态保存在订单数据库中,由订单服务去维护订单数据库。而学生选课信息在学习中心数据库,由学习服务去维护学习中心数据库的信息。下图是系统结图:

如何实现两个分布式服务(订单服务、学习服务)共同完成一件事即订单支付成功自动添加学生选课的需求,这里的关键是如何保证两个分布式服务的事务的一致性。

尝试解决上边的需求,在订单服务中远程调用选课接口,伪代码如下:

代码语言:javascript复制
订单支付结果通知方法{
    更新支付表中支付状态为“成功”。
    远程调用选课接口添加选课记录。
}

上边的逻辑说明:

1、更新支付表状态为本地数据库操作。

2、远程调用选课接口为网络远程调用请求。

3、为保存事务上边两步操作由spring控制事务,当遇到 Exception 异常则 回滚 本地数据库操作。

问题如下:

1、如果更新支付表失败则抛出异常,不再执行远程调用,此设想没有问题。

2、如果更新支付表成功,网络远程调用超时,会拉长本地数据库事务时间,影响数据库性能。

3、如果更新支付表成功,远程调用添加选课成功(选课数据库commit成功),最后更新支付表时因为数据库意外宕机或其他原因导致无法访问数据库,导致commit时失败,此时出现操作不一致。

上边的几个问题涉及到分布式事务控制,下面我们带着这些问题,来继续了解一下什么是分布式事务。

0x02 什么是分布式事务

在了解分布式事务之前,我们来回顾一下什么是分布式系统。

1、什么是分布式系统?

部署在不同节点上的系统,通过网络交互来完成协同工作的系统。

比如:充值加积分的业务,用户在充值系统向自己的账户充钱,在积分系统中自己积分相应的增加。充值系统和积分系统是两个不同的系统,一次充值加积分的业务就需要这两个系统协同工作来完成。

2、什么是事务?

事务是指由一组操作组成的一个工作单元,这个工作单元具有以下几个特点

  • 原子性(atomicity)
  • 一致性(consistency)
  • 隔离性(isolation)
  • 持久性(durability)

原子性:

执行单元中的操作要么全部执行成功,要么全部失败。如果有一部分成功一部分失败那么成功的操作要全部回滚到执行前的状态。

一致性:

执行一次事务会使用数据从一个正确的状态转换到另一个正确的状态,执行前后 数据都是完整的。

隔离性

在该事务执行的过程中,任何数据的改变只存在于该事务之中,对外界没有影响,事务与事务之间是完全的隔离的。只有事务提交后数据才会真正的储存到数据库内,其它事务才可以查询到最新的数据。

持久性:

事务完成后对数据的改变会永久性的存储起来,即使发生断电宕机数据依然在。

0x03 CAP理论

如何进行分布式事务控制?CAP 理论是分布式事务处理的理论基础,了解了 CAP 理论有助于我们研究分布式事务的处理方案。

CAP 理论是:分布式系统在设计时只能在 一致性 (Consistency)可用性(Availability)分区容忍性(PartitionTolerance) 中满足两种,无法兼顾三种。

我们通过下面的内容来进一步的了解 CAP 理论:

  • 一致性(Consistency): 服务A、B、C三个结点都存储了用户数据, 三个结点的数据需要保持同一时刻数据一致性。
  • 可用性(Availability): 服务A、B、C三个结点,其中一个结点宕机不影响整个集群对外提供服务,如果只有服务A结点,当服务A宕机整个系统将无法提供服务,增加服务B、C是为了保证系统的可用性。
  • 分区容忍性(Partition Tolerance): 分区容忍性就是允许系统通过网络协同工作,分区容忍性要解决由于网络分区导致数据的不完整及无法访问等问题。 分布式系统不可避免的出现了多个系统通过网络协同工作的场景,节点之间难免会出现 网络中断网延延迟 等现象,这种现象一旦出现就导致数据被分散在不同的节点上,这就是网络分区。

分布式系统能否兼顾C、A、P?

在保证 分区容忍性 的前提下 一致性可用性 无法兼顾,如果要提高系统的可用性就要增加多个结点,如果要保证数据的一致性就要实现每个结点的数据一致,结点越多可用性越好,但是数据一致性越差。所以,在进行分布式系统设计时,同时满足 “一致性”、“可用性” 和 “分区容性” 三者是几乎不能的。

CAP有哪些组合方式?

  • CA:放弃分区容忍性,加 强一致性 和可用性,关系数据库按照 CA 进行设计。
  • AP:放弃一致性,加强可用性和分区容忍性,追求 最终一致性,很多 NoSQL 数据库按照AP进行设计。 这里放弃一致性是指放弃强一致性,强一致性就是写入成功立刻要查询出最新数据。追求最终一致性是指允许暂时的数据不一致,只要最终在用户接受的时间内数据一致可。
  • CP:放弃可用性,加强一致性和分区容忍性,一些 强一致性 要求的系统按 CP 进行设计,比如 跨行转账一次转账 请求要等待双方银行系统都完成整个事务才算完成。 由于网络问题的存在CP系统可能会出现待等待超时,如果没有处理超时问题则整理系统会出现阻塞

总结:

在分布式系统设计中 AP 的应用较多,即 “保证分区容忍性” 和 “可用性 ”,牺牲数据的强一致性(写入操作后其他节点立即同步数据),而使用AP的方式来保证数据最终一致性。

比如:订单退款,今日退款成功,明日账户到账,只要在预定的用户可以接受的时间内退款事务走完即可。

0x04 解决方案

两阶段提交协议(2PC)

为解决分布式系统的数据一致性问题,出现了两阶段提交协议(2 Phase Commitment Protocol),两阶段提交由 协调者参与者 组成,共经过 "两个阶段" 和 "三个操作",部分关系数据库如 OracleMySQL 支持两阶段提交协议,本节讲解关系数据库两阶段提交协议。

2PC协议流程图:

1)第一阶段:准备阶段(prepare)

协调者通知参与者准备提交订单,参与者开始投票。

协调者完成准备工作向协调者回应Yes。

2)第二阶段:提交(commit)/回滚(rollback)阶段

协调者根据参与者的投票结果发起最终的提交指令。

如果有参与者没有准备好则发起回滚指令。

一个下单减库存的例子:

1、应用程序连接两个数据源。

2、应用程序通过事务协调器向两个库发起 prepare,两个数据库收到消息分别执行本地事务(记录日志),但不提交,如果执行成功则回复 yes,否则回复 no

3、事务协调器收到回复,只要有一方回复 no 则分别向参与者发起回滚事务,参与者开始回滚事务。

4、事务协调器收到回复,全部回复 yes,此时向参与者发起提交事务。如果参与者有一方提交事务失败则由事务协调器发起回滚事务。

优点:

实现强一致性,部分关系数据库支持(Oracle、MySQL等)。

缺点:

整个事务的执行需要由协调者在多个节点之间去协调,增加了事务的执行时间,性能低下。

解决方案有:springboot Atomikos or Bitronix

更详细的资料可以参考:https://en.wikipedia.org/wiki/Two-phase_commit_protocol

事务补偿(TCC)

TCC 事务补偿是基于 2PC 实现的业务层事务控制方案,它是 TryConfirmCancel 三个单词的首字母,含义如下:

1、Try 检查及预留业务资源,完成提交事务前的检查,并预留好资源。

2、Confirm 确定执行业务操作,对 try 阶段预留的资源正式执行。

3、Cancel 取消执行业务操作,对 try 阶段预留的资源释放。

下边用一个下单减库存的业务为例来说明:

1、Try

  • 下单业务由 订单服务库存服务 协同完成,在 try 阶段 订单服务 和 库存服务 完成检查和预留资源。
  • 订单服务检查当前是否满足提交订单的条件(比如:当前存在未完成订单的不允许提交新订单)。
  • 库存服务检查当前是否有充足的库存,并锁定资源。

2、Confirm

  • 订单服务 和 库存服务 成功完成Try后开始正式执行资源操作。
  • 订单服务向订单写一条订单信息。
  • 库存服务减去库存。

3、Cancel

  • 如果订单服务和库存服务有一方出现失败则全部取消操作。
  • 订单服务需要删除新增的订单信息。
  • 库存服务将减去的库存再还原。

优点:最终保证数据的一致性,在业务层实现事务控制,灵活性好。

缺点:开发成本高,每个事务操作每个参与者都需要实现 try/confirm/cancel 三个接口。

注意:TCC 的 try/confirm/cancel 接口都要实现幂等性,在try、confirm、cancel失败后要不断重试 。

什么是幂等性?幂等性是指 同一个操作无论请求多少次,其结果都相同。

幂等操作实现方式有:

1、操作之前在业务方法进行判断如果执行过了就不再执行。

2、缓存所有请求和处理的结果,已经处理的请求则直接返回结果。

消息队列实现最终一致性

本方案是将分布式事务拆分成多个本地事务来完成,并且由消息队列异步协调完成,如下图: 下边以下单减少库存为例来说明:

1、订单服务和库存服务完成检查和预留资源。

2、订单服务在本地事务中完成 “添加订单表记录” 和添加 “减少库存任务消息”。

3、由定时任务根据消息表的记录发送给 MQ 通知库存服务执行减库存操作。

4、库存服务执行减少库存,并且记录执行消息状态(为避免重复执行消息,在执行减库存之前查询是否执行过此 消息。

这里注意,每个消息通知都会带有一个消息id,用于通过本地事务校验该消息的状态

5、库存服务向 MQ 发送完成减少库存的消息。

6、订单服务接收到完成库存减少的消息后,删除原来添加的 “减少库存任务消息”。

实现最终事务一致要求:

预留资源成功理论上要求正式执行成功,如果执行失败会进行重试,要求业务执行方法实现幂等。

也就是说,如果订单服务在指定时间内如果没有收到库存服务的 “库存减少成功” 的消息,那么订单服务会再次尝试发送这个 “减少库存” 的消息到 MQ 并且由 MQ 继续尝试去通知库存服务去减少库存,直到订单服务收到 “库存减少成功” 的消息。

优点 :

MQ 按异步的方式协调完成事务,性能较高。

不用实现 try/confirm/cancel 接口,开发成本比 TCC 低。

缺点:

此方式基于关系数据库本地事务来实现,会出现频繁读写数据库记录,浪费数据库资源,另外对于高并发操作不是 最佳方案。

0x05 自动添加选课方案

搭建环境

根据自动选课需求,为了更好的分析解决方案,这里搭建订单工程及数据库。

1、创建订单工程

导入资料下的 xc-service-manage-order 工程。

2、创建订单数据库

创建订单数据库 xc_order(MySQL)导入 xc_order.sql

xc_orders:订单主表

xc_orders_details:订单明细表,记录订单的明细信息

xc_orders_pay:订单支付表记录订单的支付状态

xc_order 数据库导入 xc_order_task.sql

待处理任务表:

在任务表中包括了交换机的名称、路由 key 等信息为了是将任务的处理做成一个通用的功能。

考虑分布式系统并发读取任务处理任务的情况发生项目使用乐观锁的方式解决并发问题。

已完成任务表 :

3、创建选课数据库

创建 xc_learning 数据库,导入xc_learning.sqlxc_learning_course 为学生选课表。

导入 xc_task_his.sqlxc_task_his 为历史任务表 。

解决方案

本项目综合考虑选择基于消息的分布式事务解决方案,解决方案如下图:

1、支付成功后,订单服务向本地数据库更新订单状态,并向消息表写入“添加选课消息”,通过本地数据库保证订单状态和添加选课消息的事务。如果 更新订单状态 或者 消息写入 这两个操作其中一个失败了,那么这个事务都不会被执行,这样就保证了事务的一致性。

2、使用 Spring Task 定时任务扫描消息表,取出 “添加选课任务“ 的消息并发向MQ,由 MQ 通知学习服务添加选课。

3、学习服务接收到添加选课的消息,先查询本地数据库的历史消息表是否存在消息,存在则说明已经添加选课,则向 MQ 发送一个选课成功的消息,让订单服务知道该订单已经选课成功,否则向本地数据库添加选课,并向历史消息表添加选课消息,并发送选课成功的消息到 MQ。这里选课表和历史消息表在同一个数据库,通过本地事务保证一致性。

4、订单服务接收到完成选课的消息后,删除订单数据库中消息表的 “添加选课消息”,为保证后期对账将消息表的消息先添加到历史消息表再删除消息,表示此消息已经完成。

三、Spring Task定时任务

0x01 需求分析

根据分布式事务的研究结果,订单服务需要定时扫描任务表向 MQ 发送任务。本节研究定时任务处理的方案,并实现定时任务对任务表进行扫描,并向 MQ 发送消息。

实现定时任务的方案有以下几种:

1、使用jdk的 TimerTimerTask 实现

可以实现简单的间隔执行任务,无法实现按日历去调度执行任务。

2、使用 Quartz 实现

Quartz 是一个异步任务调度框架,功能丰富,可以实现按日历调度。

3、使用 Spring Task 实现

Spring 3.0 后提供 Spring Task 实现任务调度,支持按日历调度,相比 Quartz 功能稍简单,但是在开发基本够用,支持注解编程方式。

本项目使用 Spring Task 实现任务调度。

0x02 Spring Task 串行任务

1、编写任务类

在 Spring boot 启动类上添加注解:@EnableScheduling

新建任务测试类 TestTask,编写测试方法如下:

代码语言:javascript复制
@Component
public class ChooseCourseTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
    // @Scheduled(fixedRate = 5000) //上次执行开始时间后5秒执行
    // @Scheduled(fixedDelay = 5000) //上次执行完毕后的5秒执行
    // @Scheduled(initialDelay=3000, fixedRate=5000) //第一次延迟3秒,以后每隔5秒执行一次
    @Scheduled(cron="0/3 * * * * *")//每隔3秒执行一次
    public void task1(){
        LOGGER.info("===============测试定时任务1开始===============");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
        LOGGER.info("===============测试定时任务1结束===============");
    }
}

测试:

1、测试 fixedRatefixedDelay 的区别

2、测试并观察串行执行的特点

2、cron表达式

cron表达式包括6部分:

秒(0~59) 分钟(0~59) 小时(0~23) 月中的天(1~31) 月(1~12) 周中的天(填写MON,TUE,WED,THU,FRI,SAT,SUN,或数字1~7 1表示MON,依次类推)

特殊字符介绍:

  • “/” 字符表示指定数值的增量
  • “*” 字符表示所有可能的值
  • “-” 字符表示区间范围
  • "," 字符表示列举
  • “?” 字符仅被用于月中的天和周中的天两个子表达式,表示不指定值

例子:

  • 0/3 * * * * * 每隔 3 秒执行
  • 0 0/5 * * * * 每隔 5 分钟执行
  • 0 0 0 * * * 表示每天 0 点执行
  • 0 0 12 ? * WEN 每周三 12 点执行
  • 0 15 10 ? * MON-FRI 每月的周一到周五10点15分执行
  • 0 15 10 ? * MON,FRI 每月的周一和周五10点 15分执行

3、串行任务测试

参考 task1 方法的的定义方法,再定义 task2 方法,此时共用两个任务方法。

代码语言:javascript复制
@Scheduled(fixedRate = 3000) //上次执行开始时间后3秒执行
public void task2(){
    LOGGER.info("===============测试定时任务2开始===============");
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } 
    LOGGER.info("===============测试定时任务2结束===============");
}

通过测试发现,两个任务方法由一个线程串行执行,task1 方法执行完成 task2 再执行。

0x03 Spring Task 并行任务

1、需求分析

在项目通常是需要多个不同的任务并行去执行。

本节实现 Spring Task 并行执行任务的方法。

2、配置异步任务

创建异步任务配置类,需要配置线程池实现多线程调度任务。

代码语言:javascript复制
package com.xuecheng.order.config;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.Executor;

@Configuration
@EnableScheduling
public class AsyncTaskConfig implements SchedulingConfigurer, AsyncConfigurer {
    //线程池线程数量
    private int corePoolSize = 5;
    @Bean
    public ThreadPoolTaskScheduler taskScheduler()
    {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();//初始化线程池
        scheduler.setPoolSize(corePoolSize);//线程池容量
        return scheduler;
    }
    @Override
    public Executor getAsyncExecutor() {
        Executor executor = taskScheduler();
        return executor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
    }
}

@EnableScheduling 添加到此配置类上,SpringBoot 启动类上不用再添加 @EnableScheduling

3、测试

通过测试发现两个任务由不同的线程在并行执行,互不影响。

四、订单服务定时发送消息

0x01 需求分析

定时任务发送消息流程如下:

1、每隔1分钟扫描一次任务表。

1、定时任务扫描 task 表,一次取出多个任务,取出 超过1分钟 未处理的任务

2、考虑订单服务可能集群部署,为避免重复发送任务使用 乐观锁 的方式每次从任务列表取出要处理的任务

3、任务发送完毕更新任务发送时间

关于任务表的添加:

正常的流程是订单支付成功后,更新订单支付状态并向任务表写入 “添加选课任务”。

目前订单支付功能没有开发,所以我们暂时采用手动向任务表添加任务。

0x02 RabbitMQ配置

RabbitMQ 声明两个队列:添加选课、完成选课,交换机使用路由模式,代码如下:

代码语言:javascript复制
package com.xuecheng.order.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    //添加选课任务交换机
    public static final String EX_LEARNING_ADDCHOOSECOURSE = "ex_learning_addchoosecourse";


    //添加选课消息队列,用于发送添加选课消息
    public static final String XC_LEARNING_ADDCHOOSECOURSE = "xc_learning_addchoosecourse";

    //完成添加选课消息队列,用于接收完成选课的消息
    public static final String XC_LEARNING_FINISHADDCHOOSECOURSE = "xc_learning_finishaddchoosecourse";

    //添加选课路由key,用于发送添加选课的消息
    public static final String XC_LEARNING_ADDCHOOSECOURSE_KEY = "addchoosecourse";

    //完成添加选课路由key
    public static final String XC_LEARNING_FINISHADDCHOOSECOURSE_KEY = "finishaddchoosecourse";

    /**
     * 交换机配置,用于发送添加选课消息以及接收完成添加选课的消息
     * @return the exchange
     */
    @Bean(EX_LEARNING_ADDCHOOSECOURSE)
    public Exchange EX_DECLARE() {
        return ExchangeBuilder.directExchange(EX_LEARNING_ADDCHOOSECOURSE).durable(true).build();
    }

    /**
     * 声明用于接收 "完成添加选课" 消息的队列
     * @return Queue
     */
    @Bean(XC_LEARNING_FINISHADDCHOOSECOURSE)
    public Queue QUEUE_XC_LEARNING_FINISHADDCHOOSECOURSE() {
        Queue queue = new Queue(XC_LEARNING_FINISHADDCHOOSECOURSE);
        return queue;
    }

    /**
     * 声明用于发送 "添加选课" 消息的队列
     * @return Queue
     */
    @Bean(XC_LEARNING_ADDCHOOSECOURSE)
    public Queue QUEUE_XC_LEARNING_ADDCHOOSECOURSE() {
        Queue queue = new Queue(XC_LEARNING_ADDCHOOSECOURSE);
        return queue;
    }


    /**
     * 将 "完成添加选课消息" 的队列绑定到交换机
     * @param queue 完成添加选课消息队列
     * @param exchange 交换机
     * @return BindingBuilder
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDCHOOSECOURSE(
        @Qualifier(XC_LEARNING_FINISHADDCHOOSECOURSE) Queue queue,
        @Qualifier(EX_LEARNING_ADDCHOOSECOURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(XC_LEARNING_FINISHADDCHOOSECOURSE_KEY).noargs();
    }


    /**
     * 将 "添加选课" 消息的队列绑定到交换机
     * @param queue 添加选课消息队列
     * @param exchange 交换机
     * @return BindingBuilder
     */
    @Bean
    public Binding BINDING_QUEUE_ADDCHOOSECOURSE(
        @Qualifier(XC_LEARNING_ADDCHOOSECOURSE) Queue queue,
        @Qualifier(EX_LEARNING_ADDCHOOSECOURSE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(XC_LEARNING_ADDCHOOSECOURSE_KEY).noargs();
    }

}

0x03 查询前N条任务

Dao

dao下创建一个 XcTaskRepository 自定义方法如下:

代码语言:javascript复制
package com.xuecheng.order.dao;

import com.xuecheng.framework.domain.task.XcTask;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;

import java.util.Date;

public interface XcTaskRepository extends JpaRepository<XcTask, String> {
    //取出指定时间之前的记录
    Page<XcTask> findByUpdateTimeBefore(Pageable pageable, Date updateTime);
}

findByUpdateTimeBefore 方法为 JPA 自带的一个组合方法,可以根据传入的分页参数以及时间查询,查询到该时间之前的数据。

Service

代码语言:javascript复制
@Service
public class TaskServiceImpl implements TaskService {
    @Autowired
    XcTaskRepository xcTaskRepository;
    /**
     * 查询任务列表的实现
     * @param n 查询数量
     * @param updateTime 上次更新时间
     * @return
     */
    @Override
    public List<XcTask> findTaskList(int n, Date updateTime) {
        Pageable pageable = new PageRequest(0, n);
        Page<XcTask> byUpdateTimeBefore = xcTaskRepository.findByUpdateTimeBefore(pageable, updateTime);
        List<XcTask> content = byUpdateTimeBefore.getContent();
        return content;
    }
}

在 Service 中我们调用刚才定义的 findByUpdateTimeBefore 来查询指定时间之前的数据

编写定时任务

编写任务类,每分钟执行任务,启动订单工程,观察定时发送消息日志,观察 rabbitMQ 队列中是否有消息,代码如下:

代码语言:javascript复制
@Autowired
TaskService taskService;

//每隔1分钟扫描消息表,向mq发送消息
@Scheduled(cron = "0/60 * * * * *")
public void sendChoosecourseTask(){
    //取出当前时间1分钟之前的时间
    Calendar calendar =new GregorianCalendar();
    calendar.setTime(new Date());
    calendar.add(GregorianCalendar.MINUTE,-1);
    Date time = calendar.getTime();
    List<XcTask> taskList = taskService.findTaskList(10, time);
    System.out.println(taskList);
}

测试

成功从 task 表中拿到了任务数据

0x04 定时发送任务

Dao

XcTaskRepository 中添加更新任务方法 updateTaskTime,使用 JPA 提供的 @Query 注解来实现自定义SQL语句来实现更新操作

代码语言:javascript复制
public interface XcTaskRepository extends JpaRepository<XcTask, String> {
    //取出指定时间之前的记录
    Page<XcTask> findByUpdateTimeBefore(Pageable pageable, Date updateTime);

    //更细任务处理的时间
    @Modifying  //更新操作需要使用该注解
    @Query("update XcTask t set t.updateTime = :updateTimeParam where t.id = :idParam")  //自定义更新语句
    int updateTaskTime(@Param("idParam") String id, @Param("updateTimeParam")Date updateTime);
}

Service

添加发送消息方法, 使用 RabbitTemplate 提供的 convertAndSend 来实现发送消息到指定 MQ 交换机上,并且交换机根据提供的 routekey 转发到自定的消息队列,具体代码如下

代码语言:javascript复制
@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送添加选课消息的实现
     * @param xcTask 消息内容
     * @param ex 交换机
     * @param routingKey 路由key
     */
    @Transactional
    @Override
    public void publishChooseMsg(XcTask xcTask, String ex, String routingKey) {
        //查询任务是否存在
        Optional<XcTask> byId = xcTaskRepository.findById(xcTask.getId());
        if(byId.isPresent()){
            xcTask = byId.get();
            //发送消息到MQ
            rabbitTemplate.convertAndSend(ex, routingKey, xcTask);
            //更细当前任务的时间
            xcTaskRepository.updateTaskTime(xcTask.getId(), new Date());
        }
    }
}

编写任务类

编写任务类,每分钟执行任务,启动订单工程,观察定时发送消息日志,观察 rabbitMQ 队列中是否有消息,代码如下

代码语言:javascript复制
package com.xuecheng.order.mq;
@Component
public class ChooseCourseTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
    @Autowired
    TaskService taskService;
    //每隔1分钟扫描消息表,向mq发送消息
    @Scheduled(cron = "0/3 * * * * *")  //这里我们为了方便测试,时间改为了3秒一次
    public void sendChoosecourseTask(){
        //取出当前时间1分钟之前的时间
        Calendar calendar =new GregorianCalendar();
        calendar.setTime(new Date());
        calendar.add(GregorianCalendar.MINUTE,-1);
        Date time = calendar.getTime();
        List<XcTask> taskList = taskService.findTaskList(10, time);
        for(XcTask xcTask: taskList){
            String taskId = xcTask.getId();
            //发送消息到MQ
            taskService.publishChooseMsg(xcTask, xcTask.getMqExchange(), xcTask.getMqRoutingkey());
            LOGGER.info("send choose course task id:{}",taskId);
        }
    }
}

测试

在测试之前注意检查以下 task 表的消息中的 交换机名称、路由key是否配置正确。

我们在发布的方法中打个断点

这时已经将消息发送到了 RabbitMQ 中,我们到 RabbitMQ 的控制台中查看消息是否提交成功

消息成功发送到了 MQ

0x05 乐观锁取任务

服务将来会集群部署,为了避免任务在 1分钟 内重复执行,这里使用 乐观锁,实现思路如下:

例如 服务A服务B 都需要扫描 task 表里面的任务信息,他们各自手里都携带着一个 version 标识,默认值都为1,并且 task 表里面的 任务X 也会带有一个 version 字段,默认值也为1,如果A先达到数据库,A 在查询任务X 的时候会携带他手里的 version 标识进行匹配查询,如果能匹配到任务,证明A是第一个取出该任务的,取出该任务信息后,A会将该任务在数据库里面的version标识修改为2,在之后到达 task表进行查询的服务B也会以同样的方式匹配 version 为1的任务,这个时候 服务B 就不会再匹配到 version 值为 2 的 任务X

1、在 Dao 中增加校验当前版本及 任务id 的匹配方法

代码语言:javascript复制
public interface XcTaskRepository extends JpaRepository<XcTask, String> {
    //使用乐观锁方式校验任务id和版本号是否匹配,匹配则版本号加1
    @Modifying
    @Query("update XcTask t set t.version = :version 1 where t.id = :id and t.version =
           :version")
           public int updateTaskVersion(@Param(value = "id") String id,@Param(value = "version") int version);
           ...

2、在 service 中增加方法,使用乐观锁方法校验任务

代码语言:javascript复制
@Transactional
public int getTask(String taskId,int version){
    int i = xcTaskRepository.updateTaskVersion(taskId, version);
    return i;
}

3、执行任务类中修改

代码语言:javascript复制
...
    //任务id
    String taskId = xcTask.getId();
    //版本号
    Integer version = xcTask.getVersion();
    //调用乐观锁方法校验任务是否可以执行
    if(taskService.getTask(taskId, version)>0){
        //发送选课消息
        taskService.publish(xcTask, xcTask.getMqExchange(),xcTask.getMqRoutingkey());
        LOGGER.info("send choose course task id:{}",taskId);
	}
...

测试

测试的预期结果:使用乐观锁保证同一分钟内同一个任务只能被获取一次

启动两个实例进行测试,测试流程如下

1、配置两个端口不同的实例

实例A

实例B

修改配置文件中的端口,有限从启动配置中获取

代码语言:javascript复制
server:
  port: ${PORT:31500}

2、先启动实例A,在定时任务发送消息到MQ设置一个断点

在遍历查询结果之前,设置断点后同时启动两个实例

观察这两个实例都是否查询到了数据,并且达到断点的位置

3、测试乐观锁

实例A通过单步调试,运行到执行 getTask 之后,再将实例B也运行到 执行了 getTask之后。

在实例A执行完后,获取到了任务并且将当前任务的 version 值进行 1

而实例B则没有匹配到该任务

五、自动添加选课功能开发

0x01 学习服务添加选课

需求分析

学习服务接收 MQ 发送添加选课消息,执行添加 选 课操作。

添加选课成功向学生选课表插入记录、向历史任务表插入记录、并向 MQ 发送“完成选课”消息。

RabbitMQ配置

学习服务监听 MQ 的添加选课队列,并且声明完成选课队列,配置代码与 订单服务RabbitMQ 配置相同。

Dao

学生选课Dao

代码语言:javascript复制
public interface XcLearningCourseRepository extends JpaRepository<XcLearningCourse, String> {
    //根据用户和课程查询选课记录,用于判断是否添加选课
    XcLearningCourse findXcLearningCourseByUserIdAndCourseId(String userId, String courseId);
}

历史任务Dao:

代码语言:javascript复制
public interface XcTaskHisRepository extends JpaRepository<XcTaskHis,String> {
    
}

Service

1、添加选课方法

xc_learning_course 添加记录,为保证不重复添加选课,先查询历史任务表,如果从历史任务表查询不到任务说明此任务还没有处理,此时则添加选课并添加历史任务。

在学习服务中编码如下代码:

代码语言:javascript复制
//完成选课
@Transactional
public ResponseResult addChooseCourse(String userId, String courseId,String valid,Date startTime,Date endTime,XcTask xcTask){
        if (StringUtils.isEmpty(courseId)) {
            ExceptionCast.cast(LearningCode.CHOOSECOURSE_COURSEID_ISNULL);
        }
        if (StringUtils.isEmpty(userId)) {
            ExceptionCast.cast(LearningCode.CHOOSECOURSE_USERID_ISNULL);
        }
        if(xcTask == null || StringUtils.isEmpty(xcTask.getId())){
            ExceptionCast.cast(LearningCode.CHOOSECOURSE_TASKID_ISNULL);
        }
    //查询历史任务
    Optional<XcTaskHis> optional = xcTaskHisRepository.findById(xcTask.getId());
    if(optional.isPresent()){
        return new ResponseResult(CommonCode.SUCCESS);
    } 
    XcLearningCourse xcLearningCourse = xcLearningCourseRepository.findXcLearningCourseByUserIdAndCourseId(userId, courseId);
    if (xcLearningCourse == null) {//没有选课记录则添加
        xcLearningCourse = new XcLearningCourse();
        xcLearningCourse.setUserId(userId);
        xcLearningCourse.setCourseId(courseId);
        xcLearningCourse.setValid(valid);
        xcLearningCourse.setStartTime(startTime);
        xcLearningCourse.setEndTime(endTime);
        xcLearningCourse.setStatus("501001");
        xcLearningCourseRepository.save(xcLearningCourse);
    } else {//有选课记录则更新日期
        xcLearningCourse.setValid(valid);
        xcLearningCourse.setStartTime(startTime);
        xcLearningCourse.setEndTime(endTime);
        xcLearningCourse.setStatus("501001");
        xcLearningCourseRepository.save(xcLearningCourse);
    }
    //向历史任务表播入记录
    Optional<XcTaskHis> optional = xcTaskHisRepository.findById(xcTask.getId());
    if(!optional.isPresent()){
        //添加历史任务
        XcTaskHis xcTaskHis = new XcTaskHis();
        BeanUtils.copyProperties(xcTask,xcTaskHis);
        xcTaskHisRepository.save(xcTaskHis);
    }
    return new ResponseResult(CommonCode.SUCCESS);
}

接收添加选课消息

接收到添加选课的消息调用添加选课方法完成添加选课,并发送完成选课消息。

com.xuecheng.learning.mq 包下添加 ChooseCourseTask

代码语言:javascript复制
@Component
public class ChooseCourseTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
    @Autowired
    LearningService learningService;
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
    * 接收选课任务
    */
    @RabbitListener(queues = {RabbitMQConfig.XC_LEARNING_ADDCHOOSECOURSE})
    public void receiveChoosecourseTask(XcTask xcTask,Message message,Channel channel) throws
        IOException {
        LOGGER.info("receive choose course task,taskId:{}",xcTask.getId());
        //接收到 的消息id
        String id = xcTask.getId();
        //添加选课
        try {
            String requestBody = xcTask.getRequestBody();
            Map map = JSON.parseObject(requestBody, Map.class);
            String userId = (String) map.get("userId");
            String courseId = (String) map.get("courseId");
            String valid = (String) map.get("valid");
            Date startTime = null;
            Date endTime = null;
            SimpleDateFormat dateFormat = new SimpleDateFormat("YYYY‐MM‐dd HH:mm:ss");
            if(map.get("startTime")!=null){
                startTime =dateFormat.parse((String) map.get("startTime"));
            } 
            if(map.get("endTime")!=null){
                endTime =dateFormat.parse((String) map.get("endTime"));
            } 
            //添加选课
            ResponseResult addcourse = learningService.addcourse(
                userId, courseId,valid,startTime, endTime,xcTask);
            //选课成功发送响应消息
            if(addcourse.isSuccess()){
                //发送响应消息
                rabbitTemplate.convertAndSend(
                    RabbitMQConfig.EX_LEARNING_ADDCHOOSECOURSE, 
                    RabbitMQConfig.XC_LEARNING_FINISHADDCHOOSECOURSE_KEY, 
                    xcTask );
                LOGGER.info("send finish choose course taskId:{}",id);
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.error("send finish choose course taskId:{}", id);
        }
    }
}

测试

测试接收选课消息,并且返回选课成功的消息到 MQ

订单服务发送 “添加选课” 的消息到MQ

学习服务接从MQ中收到 “添加选课” 的消息,进行添加或者更新选课记录,再发送一个完成选课的消息到MQ

再到 RabbitMQ 的控制台查看消息是否发送成功

0x02 订单服务结束任务

需求分析

订单服务接收 MQ 完成选课的消息,将任务从当前任务表删除,将完成的任务添加到完成任务表。

Dao

配置 XcTaskHisRepository

代码语言:javascript复制
package com.xuecheng.order.dao;

import com.xuecheng.framework.domain.task.XcTaskHis;
import org.springframework.data.jpa.repository.JpaRepository;

public interface XcTaskHisRepository extends JpaRepository<XcTaskHis, String> {

}

Service

代码语言:javascript复制
//删除任务
@Transactional
public void finishTask(String taskId){
    Optional<XcTask> taskOptional = xcTaskRepository.findById(taskId);
    if(taskOptional.isPresent()){
        XcTask xcTask = taskOptional.get();
        xcTask.setDeleteTime(new Date());
        XcTaskHis xcTaskHis = new XcTaskHis();
        BeanUtils.copyProperties(xcTask, xcTaskHis);
        //保存任务到 task_hit 表内
        xcTaskHisRepository.save(xcTaskHis);
        //删除当前任务
        xcTaskRepository.delete(xcTask);
    }
}

接收完成选课消息

在 com.xuecheng.manage_order.mq 包下 ChooseCourseTask 类中添加 receiveChoosecourseTask,接收完成课任务消息并进行处理。

代码语言:javascript复制
/**
* 接收选课响应结果
*/
@RabbitListener(queues = {RabbitMQConfig.xc_learning_finishaddchoosecourse})
public void receiveFinishChoosecourseTask(XcTask task,Message message, Channel channel) throws
    IOException {
    LOGGER.info("receiveChoosecourseTask...{}",task.getId());
    //接收到 的消息id
    String id = task.getId();
    //删除任务,添加历史任务
    taskService.finishTask(id);
}

0x03 集成测试

测试流程

1、添加任务数据

向 xc_task 表添加一行数据,模拟用户支付成功后向 xc_task 表写入添加选课的任务

SQL 语句如下

代码语言:javascript复制
INSERT INTO `xc_task` VALUES ('4028858162959ce5016295b604ba0000', '2018-04-05 20:09:17', '2020-06-05 17:15:51', '2020-06-05 17:40:04', 'add_choosecourse', 'ex_learning_addchoosecourse', 'addchoosecourse', '{"courseId":"4028e58161bcf7f40161bcf8b77c0000,","userId":"49"}', NULL, '10201', NULL);
2、配置断点

为了能够清楚的看到整个处理流程,我们在各个处理模块下设置断点。

在订单服务的 ChooseCourseTask 下配置断点,分别是发送 “添加选课” 消息到MQ的模块 和 接收 “选课完成” 消息的模块

在学习服务的接收选课任务方法下设置断点

3、启动服务进行测试

同时启动 订单服务学习服务

订单服务扫描 xc_task 表中的任务,并且发送 “添加选课” 的消息到 MQ

学习服务通过监听 MQ的消息,接收到了订单服务送过来的 “添加选课” 消息

完成选课添加后,发送响应的消息到MQ

订单服务接收到 “选课完成” 的消息,删除 xc_task 表中的任务,并且添加任务记录到 xc_task_his 表中

学习服务收到重复的完成消息,但由于 xc_task 中该任务的已经被删除,所以步任何的操作,如下图

0 人点赞