RabbitMQ快速上手的学习案例使用一个高可用外卖系统的demo。
高可用外卖系统
高可用外卖系统需求分析
- 一个外卖后端系统,用户可以在线下单外卖
- 用户下单后,可以实时查询订单进度
- 系统可以承受短时间的大量并发请求
架构设计
使用微服务系统,组件之间充分解耦 使用消息中间件,解耦业务逻辑 使用数据库,持久化业务数据
什么是微服务架构
将应用程序构建为松耦合、可独立部署的一组服务 服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能 松耦合:封装服务的实现细节,通过API调用
如何拆分微服务
根据系统操作进行微服务拆分 根据业务能力进行微服务拆分(推荐使用) 根据子域进行微服务拆分
根据业务能力进行微服务拆分
合理的交换机和队列设置
- 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
- 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
- 合理配置交换机类型,使用Topic模式时仔细设置绑定键
- 尽量使用自动化 配置将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错
业务流程时序图
接口需求
新建订单接口 查询订单接口 接口采用REST风格
微服务的数据库设计原则
每个微服务使用自己的数据库 不要使用共享数据库的方式进行通信 不要使用外键,对于数据量非常少的表慎用索引
food.sql
代码语言:javascript复制-- ----------------------------
-- Table structure for deliveryman
-- ----------------------------
DROP TABLE IF EXISTS `deliveryman`;
CREATE TABLE `deliveryman` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '骑手id',
`name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of deliveryman
-- ----------------------------
INSERT INTO `deliveryman` VALUES (1, 'wangxiaoer', 'AVALIABLE', '2020-06-10 20:30:17');
-- ----------------------------
-- Table structure for order_detail
-- ----------------------------
DROP TABLE IF EXISTS `order_detail`;
CREATE TABLE `order_detail` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单id',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`address` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '订单地址',
`account_id` int(11) NULL DEFAULT NULL COMMENT '用户id',
`product_id` int(11) NULL DEFAULT NULL COMMENT '产品id',
`deliveryman_id` int(11) NULL DEFAULT NULL COMMENT '骑手id',
`settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
`reward_id` int(11) NULL DEFAULT NULL COMMENT '积分奖励id',
`price` decimal(10, 2) NULL DEFAULT NULL COMMENT '价格',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 27 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` VALUES (9, 'SETTLEMENT_CONFIRMED', '深圳', 12145, 2, 1, 2, NULL, 23.25, '2022-04-04 17:57:02');
INSERT INTO `order_detail` VALUES (10, 'ORDER_CREATED', '深圳', 12145, 2, 1, 3, 1, 23.25, '2022-04-05 23:57:19');
-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '产品id',
`name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
`price` decimal(9, 2) NULL DEFAULT NULL COMMENT '单价',
`restaurant_id` int(11) NULL DEFAULT NULL COMMENT '地址',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES (2, 'eqwe', 23.25, 1, 'AVALIABLE', '2020-05-06 19:19:04');
-- ----------------------------
-- Table structure for restaurant
-- ----------------------------
DROP TABLE IF EXISTS `restaurant`;
CREATE TABLE `restaurant` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '餐厅id',
`name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
`address` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地址',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of restaurant
-- ----------------------------
INSERT INTO `restaurant` VALUES (1, 'qeqwe', '2weqe', 'OPEN', 1, '2020-05-06 19:19:39');
-- ----------------------------
-- Table structure for reward
-- ----------------------------
DROP TABLE IF EXISTS `reward`;
CREATE TABLE `reward` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '奖励id',
`order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
`amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '积分量',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of reward
-- ----------------------------
INSERT INTO `reward` VALUES (1, 10, 23.25, 'SUCCESS', '2022-04-06 00:00:01');
-- ----------------------------
-- Table structure for settlement
-- ----------------------------
DROP TABLE IF EXISTS `settlement`;
CREATE TABLE `settlement` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '结算id',
`order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
`transaction_id` int(11) NULL DEFAULT NULL COMMENT '交易id',
`amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '金额',
`status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
`date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of settlement
-- ----------------------------
INSERT INTO `settlement` VALUES (2, 9, 571087981, 23.25, 'SUCCESS', '2022-04-04 17:59:08');
原生RabbitMQ快速上手步骤
订单微服务搭建步骤:
- 目录结构
- 导入pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.kt</groupId>
<artifactId>food</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>food</name>
<description>food System</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 编写配置文件application.properties
#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT+8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#Rabbitmq相关配置
rabbitmq.host=192.168.137.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
#本服务使用的交换机
rabbitmq.exchange=exchange.food
- 编写PO、VO、DTO等数据传输对象
OrderDetailPO.java(存数据库所用类型)
代码语言:javascript复制package cn.kt.food.orderservicemanager.po;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
* @author tao
* @date 2022-03-22 21:36
* 概要:存数据库所用类型
*/
@Data
public class OrderDetailPO {
private Integer id;
private OrderStatusEnum status;
private String address;
private Integer accountId;
private Integer productId;
private Integer deliverymanId;
private Integer settlementId;
private Integer rewardId;
private BigDecimal price;
private Date date;
}
OrderCreateVO.java(前端传进来的数据)
代码语言:javascript复制package cn.kt.food.orderservicemanager.vo;
import lombok.Data;
/**
* @author tao
* @date 2022-03-22 21:25
* 概要: vo:前端传进来的数据
*/
@Data
public class OrderCreateVO {
/**
* 用户ID
*/
private Integer accountId;
/**
* 地址
*/
private String address;
/**
* 产品ID
*/
private Integer productId;
}
OrderMessageDTO.java(消息体,用于传输数据)
代码语言:javascript复制package cn.kt.food.orderservicemanager.dto;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;
/**
* @author tao
* @date 2022-03-22 21:27
* 概要:dto:消息体,用于传输数据
*/
@Data
public class OrderMessageDTO {
/**
* 订单ID
*/
private Integer orderId;
/**
* 订单状态
*/
private OrderStatusEnum orderStatus;
/**
* 价格
*/
private BigDecimal price;
/**
* 骑手ID
*/
private Integer deliverymanId;
/**
* 产品ID
*/
private Integer productId;
/**
* 用户ID
*/
private Integer accountId;
/**
* 结算ID
*/
private Integer settlementId;
/**
* 积分结算ID
*/
private Integer rewardId;
/**
* 积分奖励数量
*/
private BigDecimal rewardAmount;
/**
* 确认
*/
private Boolean confirmed;
}
- 编写订单状态枚举类OrderStatusEnum.java
package cn.kt.food.orderservicemanager.enums;
/**
* @author tao
* @date 2022-03-22 21:29
* 概要: 订单状态枚举
*/
public enum OrderStatusEnum {
/**
* 创建中
*/
ORDER_CREATING,
/**
* 餐厅已确认
*/
RESTAURANT_CONFIRMED,
/**
* 骑手确认
*/
DELIVERYMAN_CONFIRMED,
/**
* 已结算
*/
SETTLEMENT_CONFIRMED,
/**
* 订单已创建
*/
ORDER_CREATED,
/**
* 订单创建失败
*/
FAILED;
}
- 编写数据库dao层
- OrderDetailDao.java
package cn.kt.food.orderservicemanager.dao;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
/**
* @author tao
* @date 2022-03-22 21:39
* 概要:
*/
@Mapper
@Repository
public interface OrderDetailDao {
@Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, "
"reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId},"
"#{settlementId}, #{rewardId},#{price}, #{date})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(OrderDetailPO orderDetailPO);
@Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, "
"product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, "
"reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
void update(OrderDetailPO orderDetailPO);
@Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId,"
"settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
OrderDetailPO selectOrder(Integer id);
}
- 编写处理用户订单的业务请求service OrderService.java
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:13
* 概要: 处理用户关于订单的业务请求
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private OrderDetailDao orderDetailDao;
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
private ObjectMapper objectMapper = new ObjectMapper();
// 创建订单
public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException {
log.info("createOrder:orderCreateVO:{}", orderCreateVO);
OrderDetailPO orderPO = new OrderDetailPO();
orderPO.setAddress(orderCreateVO.getAddress());
orderPO.setAccountId(orderCreateVO.getAccountId());
orderPO.setProductId(orderCreateVO.getProductId());
orderPO.setStatus(OrderStatusEnum.ORDER_CREATING);
orderPO.setDate(new Date());
// 会返回数据库自动生成的数据
orderDetailDao.insert(orderPO);
OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
orderMessageDTO.setOrderId(orderPO.getId());
orderMessageDTO.setProductId(orderPO.getProductId());
orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
// 创建订单之后给restaurant发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 配置channel,开启确认模式
channel.confirmSelect();
//单条同步确认机制
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm success");
} else {
log.info("RabbitMQ confirm failed");
}
// 异步同步确认机制
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
log.info("Ack deliveryTag:{},mutiple:{}", l, b);
// 消息发送成功
}
@Override
public void handleNack(long l, boolean b) throws IOException {
log.info("Nack deliveryTag:{},mutiple:{}", l, b);
// 消息发送失败
}
};
channel.addConfirmListener(confirmListener);
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
//(exchange,routingKey,消息特殊参数,消息体本身(字节))
// channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
// 设置单条消息的过期时间(时间到期后消息会被消费)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
/*for (int i = 0; i < 50; i ) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}*/
// 发送多条消息
/*for (int i = 0; i < 10; i ) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
Thread.sleep(10000);*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 编写消息处理相关业务逻辑service OrderMessageService.java
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:15
* 概要:消息处理相关业务逻辑
*/
@Slf4j
@Service
public class OrderMessageService {
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
@Autowired
private OrderDetailDao orderDetailDao;
ObjectMapper objectMapper = new ObjectMapper();
/**
* 声明消息队列、交换机、绑定、消息的处理
* (异步线程调用这个方法,且异步线程不能退出,注册完消费者之后sleep,需要设置线程池)
*/
@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
log.info("start linstening message");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
/*---------------------restaurant微服务(声明)---------------------*/
// order交换机
channel.exchangeDeclare(
"exchange.order.restaurant", //交换机名称
BuiltinExchangeType.DIRECT, //交换机类型
true, //是否持久化
false, //是否交换机长时间不使用删除
null); //是否交换机长时间不使用删除
// 订单队列
channel.queueDeclare(
"queue.order", //队列名称
true, //是否持久化
false, // 队列是否独占(独占只允许一个应用连接)
false, //是否交换机长时间不使用删除
null); //是否交换机长时间不使用删除
// 队列绑定交换机
channel.queueBind(
"queue.order", //队列名称
"exchange.order.restaurant", //交换机名称
"key.order"); //路由键,用来指示消息的路由转发,相当于快递的地址
/*---------------------deliveryman微服务---------------------*/
// 骑手交换机
channel.exchangeDeclare(
"exchange.order.deliveryman",
BuiltinExchangeType.DIRECT,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.order.deliveryman",
"key.order");
/*---------------------settlement微服务---------------------*/
// 结算交换机
channel.exchangeDeclare(
"exchange.order.settlement",
BuiltinExchangeType.FANOUT,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.settlement.order",
"key.order");
/*---------------------reward微服务---------------------*/
// 积分交换机
channel.exchangeDeclare(
"exchange.order.reward",
BuiltinExchangeType.TOPIC,
true,
false,
null);
channel.queueBind(
"queue.order",
"exchange.order.reward",
"key.order");// 降级使用,没有使用到TOPIC的特性
/**
* 监听订单状态
* (队列,是不是ACK,回调函数,消费者标签)
*/
channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
});
while (true) {
Thread.sleep(100000);
}
}
}
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody());
log.info("deliverCallback:messageBody:{}", messageBody);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
try {
// 将消息体反序列化成DTO
OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
OrderMessageDTO.class);
// 读取数据库中的PO
OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
switch (orderPO.getStatus()) {
case ORDER_CREATING:
// 修改订单状态
if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
orderPO.setStatus(OrderStatusEnum.RESTAURANT_CONFIRMED);
orderPO.setPrice(orderMessageDTO.getPrice());
orderDetailDao.update(orderPO);
// 订单状态更新后给骑手发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.deliveryman",
"key.deliveryman",
null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case RESTAURANT_CONFIRMED:
if (null != orderMessageDTO.getDeliverymanId()) {
orderPO.setStatus(OrderStatusEnum.DELIVERYMAN_CONFIRMED);
orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
orderDetailDao.update(orderPO);
// 发消息
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.settlement",
"key.settlement",
null,
messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case DELIVERYMAN_CONFIRMED:
if (null != orderMessageDTO.getSettlementId()) {
orderPO.setStatus(OrderStatusEnum.SETTLEMENT_CONFIRMED);
orderPO.setSettlementId(orderMessageDTO.getSettlementId());
orderDetailDao.update(orderPO);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes());
}
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
case SETTLEMENT_CONFIRMED: // 订单创建完成
if (null != orderMessageDTO.getRewardId()) {
orderPO.setStatus(OrderStatusEnum.ORDER_CREATED);
orderPO.setRewardId(orderMessageDTO.getRewardId());
orderDetailDao.update(orderPO);
} else {
orderPO.setStatus(OrderStatusEnum.FAILED);
orderDetailDao.update(orderPO);
}
break;
}
} catch (JsonProcessingException | TimeoutException e) {
e.printStackTrace();
}
};
}
- 编写接口controller OrderController.java
package cn.kt.food.orderservicemanager.controller;
import cn.kt.food.orderservicemanager.service.OrderService;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:12
* 概要:
*/
@Slf4j
@RestController
@RequestMapping("api/v1")
public class OrderController {
@Autowired
OrderService orderService;
@PostMapping("/orders")
public void createOrder(@RequestBody OrderCreateVO orderCreateDTO) throws IOException, TimeoutException {
log.info("createOrder:orderCreateDTO:{}", orderCreateDTO);
orderService.createOrder(orderCreateDTO);
}
}
- 线程池配置类和自动监听配置 线程池配置类:AsyncTaskConfig.java
package cn.kt.food.orderservicemanager.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author tao
* @date 2022-03-24 22:44
* 概要: 线程池配置类
*/
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
@Override
@Bean
public Executor getAsyncExecutor() {
// 起一个线程池
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(10);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//线程池所使用的缓冲队列
threadPool.setQueueCapacity(10);
//等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀
threadPool.setThreadNamePrefix("Rabbit-Async-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
RabbitMQ需要自动执行并且实时监听,因此需要配置自动执行OrderMessageService中handleMessage方法 RabbitConfig.java
代码语言:javascript复制package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:58
* 概要: 自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
*/
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
OrderMessageService orderMessageService;
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
orderMessageService.handleMessage();
}
}
订单微服务和RabbitMQ的创建大致如上,因此也还有:商家微服务、骑手微服务、结算微服务、积分微服务。 其功能是在订单的每个阶段处理相应的业务逻辑,其中在每个微服务的消息通讯时使用RabbitMQ进行消息的路由和转发,套路和订单微服务差不多一致。
注:其余微服务和总代码放在文章末尾
RabbitMQ使用总结
- 新建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
- Channel处理相关配置和使用basicPublish发送消息 注意:channel.basicPublish(exchange,routingKey,消息特殊参数,消息体本身(字节)) RabbitMQ发送的消息体本身是字节
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
// 业务逻辑
// 发送消息处理
ObjectMapper objectMapper = new ObjectMapper();
String messageToSend = objectMapper.writeValueAsString("需要发送的消息");
//(exchange,routingKey,消息特殊参数,消息体本身(字节))
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
}
- 配置RabbitMQ的Exchange和queue
// 声明交换机
channel.exchangeDeclare(
"exchange.name",
BuiltinExchangeType.DIRECT,
true,
false,
null);
// 声明消息队列
channel.queueDeclare(
"queue.name",
true,
false,
false,
null);
// 队列绑定交换机
channel.queueBind(
"queue.name",
"exchange.name",
"key.name");
- 使用basicConsume消费消息
@Async
public void handleMessage() {
/**
* 监听订单状态
* (队列,是不是ACK,回调函数,消费者标签)
*/
channel.basicConsume("queue.name", true, deliverCallback, consumerTag -> {
});
}
- 定义回调函数 收到消息后进入的回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
//业务逻辑
};
- 配置线程池
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
@Override
@Bean
public Executor getAsyncExecutor() {
// 起一个线程池
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(10);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//线程池所使用的缓冲队列
threadPool.setQueueCapacity(10);
//等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀
threadPool.setThreadNamePrefix("Rabbit-Async-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
- 使用线程池启动basicConsume
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
orderMessageService.handleMessage();
}
使用原生RabbitMQ项目中的不足之处
消息真的发出去了吗?
消息发送后,发送端不知道RabbitMQ是否真的收到了消息 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常 需要使用RabbitMQ发送端确认机制,确认消息发送
消息真被路由了吗?
消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃 消息丢弃后,订单处理流程停止,业务异常 需要使用RabbitMQ消息返回机制,确认消息被正确路由
消费端处理的过来吗?
业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定
消费端处理异常怎么办?
默认情况下,消费端接收消息时,消息会被自动确认(ACK) 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况 需要使用RabbitMQ消费端确认机制,确认消息被正确处理
队列爆满怎么办?
默认情况下,消息进入队列,会永远存在,直到被消费 大量堆积的消息会给RabbitMQ产生很大的压力 需要使用RabbitMQ消息过期时间,防止消息大量积压
如何转移过期消息?
消息被设置了过期时间,过期后会直接被丢弃 直接被丢弃的消息,无法对系统运行异常发出警报 需要使用RabbitMQ死信队列,收集过期消息,以供分析
不足之处总结
目前项目急需引入的RabbitMQ新特性: 发送端确认机制 消费端确认机制 消息返回机制 消息过期机制 消费端限流机制 死信队列
解决这些不足之处需要用到RabbitMQ的高级特性。
实际开发中经验及小结
- 使用线程池:对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸”
- POJO类单一职责 a. 各种POJO数据结构必须单一职责,混用会导致代码混乱 b. PO/DO: (Persistent Object/Data Object)持久对象 c. DTO:(Data Transfer Object)数据传输对象 d. BO:(Business Object)业务对象 e. vo: (View Object)显示层对象
源代码:
https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_1