RabbitMQ学习笔记(二)——RabbitMQ快速上手

2022-09-26 18:59:39 浏览数 (1)

RabbitMQ快速上手的学习案例使用一个高可用外卖系统的demo。

高可用外卖系统

高可用外卖系统需求分析

  1. 一个外卖后端系统,用户可以在线下单外卖
  2. 用户下单后,可以实时查询订单进度
  3. 系统可以承受短时间的大量并发请求

架构设计

使用微服务系统,组件之间充分解耦 使用消息中间件,解耦业务逻辑 使用数据库,持久化业务数据

什么是微服务架构

将应用程序构建为松耦合、可独立部署的一组服务 服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能 松耦合:封装服务的实现细节,通过API调用

如何拆分微服务

根据系统操作进行微服务拆分 根据业务能力进行微服务拆分(推荐使用) 根据子域进行微服务拆分

根据业务能力进行微服务拆分

合理的交换机和队列设置

  • 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
  • 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
  • 合理配置交换机类型,使用Topic模式时仔细设置绑定键
  • 尽量使用自动化 配置将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错

业务流程时序图

接口需求

新建订单接口 查询订单接口 接口采用REST风格

微服务的数据库设计原则

每个微服务使用自己的数据库 不要使用共享数据库的方式进行通信 不要使用外键,对于数据量非常少的表慎用索引

food.sql

代码语言:javascript复制
-- ----------------------------
-- Table structure for deliveryman
-- ----------------------------
DROP TABLE IF EXISTS `deliveryman`;
CREATE TABLE `deliveryman`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '骑手id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of deliveryman
-- ----------------------------
INSERT INTO `deliveryman` VALUES (1, 'wangxiaoer', 'AVALIABLE', '2020-06-10 20:30:17');

-- ----------------------------
-- Table structure for order_detail
-- ----------------------------
DROP TABLE IF EXISTS `order_detail`;
CREATE TABLE `order_detail`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `address` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '订单地址',
  `account_id` int(11) NULL DEFAULT NULL COMMENT '用户id',
  `product_id` int(11) NULL DEFAULT NULL COMMENT '产品id',
  `deliveryman_id` int(11) NULL DEFAULT NULL COMMENT '骑手id',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `reward_id` int(11) NULL DEFAULT NULL COMMENT '积分奖励id',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '价格',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 27 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` VALUES (9, 'SETTLEMENT_CONFIRMED', '深圳', 12145, 2, 1, 2, NULL, 23.25, '2022-04-04 17:57:02');
INSERT INTO `order_detail` VALUES (10, 'ORDER_CREATED', '深圳', 12145, 2, 1, 3, 1, 23.25, '2022-04-05 23:57:19');

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '产品id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `price` decimal(9, 2) NULL DEFAULT NULL COMMENT '单价',
  `restaurant_id` int(11) NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES (2, 'eqwe', 23.25, 1, 'AVALIABLE', '2020-05-06 19:19:04');

-- ----------------------------
-- Table structure for restaurant
-- ----------------------------
DROP TABLE IF EXISTS `restaurant`;
CREATE TABLE `restaurant`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '餐厅id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `address` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of restaurant
-- ----------------------------
INSERT INTO `restaurant` VALUES (1, 'qeqwe', '2weqe', 'OPEN', 1, '2020-05-06 19:19:39');

-- ----------------------------
-- Table structure for reward
-- ----------------------------
DROP TABLE IF EXISTS `reward`;
CREATE TABLE `reward`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '奖励id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '积分量',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of reward
-- ----------------------------
INSERT INTO `reward` VALUES (1, 10, 23.25, 'SUCCESS', '2022-04-06 00:00:01');

-- ----------------------------
-- Table structure for settlement
-- ----------------------------
DROP TABLE IF EXISTS `settlement`;
CREATE TABLE `settlement`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '结算id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `transaction_id` int(11) NULL DEFAULT NULL COMMENT '交易id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '金额',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of settlement
-- ----------------------------
INSERT INTO `settlement` VALUES (2, 9, 571087981, 23.25, 'SUCCESS', '2022-04-04 17:59:08');

原生RabbitMQ快速上手步骤

订单微服务搭建步骤:

  1. 目录结构
  1. 导入pom.xml
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.kt</groupId>
<artifactId>food</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>food</name>
<description>food System</description>
<properties>
    <java.version>1.8</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
 
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.6</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
 
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>
</project>
 

  1. 编写配置文件application.properties
代码语言:javascript复制
#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT+8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#Rabbitmq相关配置
rabbitmq.host=192.168.137.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
#本服务使用的交换机
rabbitmq.exchange=exchange.food
 

  1. 编写PO、VO、DTO等数据传输对象

OrderDetailPO.java(存数据库所用类型)

代码语言:javascript复制
package cn.kt.food.orderservicemanager.po;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
 * @author tao
 * @date 2022-03-22 21:36
 * 概要:存数据库所用类型
 */

@Data
public class OrderDetailPO {
    private Integer id;
    private OrderStatusEnum status;
    private String address;
    private Integer accountId;
    private Integer productId;
    private Integer deliverymanId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal price;
    private Date date;
}

OrderCreateVO.java(前端传进来的数据)

代码语言:javascript复制
package cn.kt.food.orderservicemanager.vo;
import lombok.Data;

/**
 * @author tao
 * @date 2022-03-22 21:25
 * 概要:  vo:前端传进来的数据
 */
@Data
public class OrderCreateVO {
    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 地址
     */
    private String address;

    /**
     * 产品ID
     */
    private Integer productId;
}

OrderMessageDTO.java(消息体,用于传输数据)

代码语言:javascript复制
package cn.kt.food.orderservicemanager.dto;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;

/**
 * @author tao
 * @date 2022-03-22 21:27
 * 概要:dto:消息体,用于传输数据
 */
@Data
public class OrderMessageDTO {

    /**
     * 订单ID
     */
    private Integer orderId;

    /**
     * 订单状态
     */
    private OrderStatusEnum orderStatus;

    /**
     * 价格
     */
    private BigDecimal price;

    /**
     * 骑手ID
     */
    private Integer deliverymanId;

    /**
     * 产品ID
     */
    private Integer productId;

    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 结算ID
     */
    private Integer settlementId;

    /**
     * 积分结算ID
     */
    private Integer rewardId;

    /**
     * 积分奖励数量
     */
    private BigDecimal rewardAmount;

    /**
     * 确认
     */
    private Boolean confirmed;
}
  1. 编写订单状态枚举类OrderStatusEnum.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.enums;
/**
* @author tao
* @date 2022-03-22 21:29
* 概要:  订单状态枚举
*/
public enum OrderStatusEnum {
 
/**
 * 创建中
 */
ORDER_CREATING,
 
/**
 * 餐厅已确认
 */
 
RESTAURANT_CONFIRMED,
 
/**
 * 骑手确认
 */
DELIVERYMAN_CONFIRMED,
 
/**
 * 已结算
 */
SETTLEMENT_CONFIRMED,
 
/**
 * 订单已创建
 */
ORDER_CREATED,
 
/**
 * 订单创建失败
 */
FAILED;
}
 

  1. 编写数据库dao层
  2. OrderDetailDao.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.dao;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
/**
* @author tao
* @date 2022-03-22 21:39
* 概要:
*/
@Mapper
@Repository
public interface OrderDetailDao {
 
@Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, "  
        "reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId},"  
        "#{settlementId}, #{rewardId},#{price}, #{date})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(OrderDetailPO orderDetailPO);
 
@Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, "  
        "product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, "  
        "reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
void update(OrderDetailPO orderDetailPO);
 
@Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId,"  
        "settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
OrderDetailPO selectOrder(Integer id);
}

  1. 编写处理用户订单的业务请求service OrderService.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:13
* 概要:  处理用户关于订单的业务请求
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private OrderDetailDao orderDetailDao;
@Autowired
RabbitTemplate rabbitTemplate;
 
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
 
private ObjectMapper objectMapper = new ObjectMapper();
 
// 创建订单
public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException {
    log.info("createOrder:orderCreateVO:{}", orderCreateVO);
    OrderDetailPO orderPO = new OrderDetailPO();
    orderPO.setAddress(orderCreateVO.getAddress());
    orderPO.setAccountId(orderCreateVO.getAccountId());
    orderPO.setProductId(orderCreateVO.getProductId());
    orderPO.setStatus(OrderStatusEnum.ORDER_CREATING);
    orderPO.setDate(new Date());
    // 会返回数据库自动生成的数据
    orderDetailDao.insert(orderPO);
 
    OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
    orderMessageDTO.setOrderId(orderPO.getId());
    orderMessageDTO.setProductId(orderPO.getProductId());
    orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
 
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
 
    // 创建订单之后给restaurant发消息
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        // 配置channel,开启确认模式
        channel.confirmSelect();
 
        //单条同步确认机制
        if (channel.waitForConfirms()) {
            log.info("RabbitMQ confirm success");
        } else {
 
            log.info("RabbitMQ confirm failed");
        }
 
        // 异步同步确认机制
        ConfirmListener confirmListener = new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                log.info("Ack deliveryTag:{},mutiple:{}", l, b);
                // 消息发送成功
            }
 
            @Override
            public void handleNack(long l, boolean b) throws IOException {
                log.info("Nack deliveryTag:{},mutiple:{}", l, b);
                // 消息发送失败
            }
        };
        channel.addConfirmListener(confirmListener);
 
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
 
        //(exchange,routingKey,消息特殊参数,消息体本身(字节))
        // channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
 
        // 设置单条消息的过期时间(时间到期后消息会被消费)
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
        /*for (int i = 0; i < 50; i  ) {
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            log.info("message sent");
        }*/
 
        // 发送多条消息
        /*for (int i = 0; i < 10; i  ) {
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            log.info("message sent");
        }
        Thread.sleep(10000);*/
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

  1. 编写消息处理相关业务逻辑service OrderMessageService.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:15
* 概要:消息处理相关业务逻辑
*/
@Slf4j
@Service
public class OrderMessageService {
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
 
@Autowired
private OrderDetailDao orderDetailDao;
ObjectMapper objectMapper = new ObjectMapper();
/**
 * 声明消息队列、交换机、绑定、消息的处理
 * (异步线程调用这个方法,且异步线程不能退出,注册完消费者之后sleep,需要设置线程池)
 */
@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
    log.info("start linstening message");
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
 
        /*---------------------restaurant微服务(声明)---------------------*/
        // order交换机
        channel.exchangeDeclare(
                "exchange.order.restaurant",    //交换机名称
                BuiltinExchangeType.DIRECT,     //交换机类型
                true,   //是否持久化
                false,  //是否交换机长时间不使用删除
                null);  //是否交换机长时间不使用删除
 
        // 订单队列
        channel.queueDeclare(
                "queue.order",  //队列名称
                true,   //是否持久化
                false,  // 队列是否独占(独占只允许一个应用连接)
                false,  //是否交换机长时间不使用删除
                null);  //是否交换机长时间不使用删除
 
        // 队列绑定交换机
        channel.queueBind(
                "queue.order",  //队列名称
                "exchange.order.restaurant",    //交换机名称
                "key.order");   //路由键,用来指示消息的路由转发,相当于快递的地址
 
        /*---------------------deliveryman微服务---------------------*/
        // 骑手交换机
        channel.exchangeDeclare(
                "exchange.order.deliveryman",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.order.deliveryman",
                "key.order");
 
        /*---------------------settlement微服务---------------------*/
        // 结算交换机
        channel.exchangeDeclare(
                "exchange.order.settlement",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.settlement.order",
                "key.order");
 
        /*---------------------reward微服务---------------------*/
        // 积分交换机
        channel.exchangeDeclare(
                "exchange.order.reward",
                BuiltinExchangeType.TOPIC,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.order.reward",
                "key.order");// 降级使用,没有使用到TOPIC的特性
 
        /**
         * 监听订单状态
         * (队列,是不是ACK,回调函数,消费者标签)
         */
        channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }
}
 
DeliverCallback deliverCallback = (consumerTag, message) -> {
    String messageBody = new String(message.getBody());
    log.info("deliverCallback:messageBody:{}", messageBody);
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
    try {
        // 将消息体反序列化成DTO
        OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                OrderMessageDTO.class);
        // 读取数据库中的PO
        OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
        switch (orderPO.getStatus()) {
            case ORDER_CREATING:
                // 修改订单状态
                if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
                    orderPO.setStatus(OrderStatusEnum.RESTAURANT_CONFIRMED);
                    orderPO.setPrice(orderMessageDTO.getPrice());
                    orderDetailDao.update(orderPO);
                    // 订单状态更新后给骑手发消息
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.deliveryman",
                                "key.deliveryman",
                                null,
                                messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case RESTAURANT_CONFIRMED:
                if (null != orderMessageDTO.getDeliverymanId()) {
                    orderPO.setStatus(OrderStatusEnum.DELIVERYMAN_CONFIRMED);
                    orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
                    orderDetailDao.update(orderPO);
                    // 发消息
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.settlement",
                                "key.settlement",
                                null,
                                messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case DELIVERYMAN_CONFIRMED:
                if (null != orderMessageDTO.getSettlementId()) {
                    orderPO.setStatus(OrderStatusEnum.SETTLEMENT_CONFIRMED);
                    orderPO.setSettlementId(orderMessageDTO.getSettlementId());
                    orderDetailDao.update(orderPO);
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case SETTLEMENT_CONFIRMED:  // 订单创建完成
                if (null != orderMessageDTO.getRewardId()) {
                    orderPO.setStatus(OrderStatusEnum.ORDER_CREATED);
                    orderPO.setRewardId(orderMessageDTO.getRewardId());
                    orderDetailDao.update(orderPO);
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
        }
 
    } catch (JsonProcessingException | TimeoutException e) {
        e.printStackTrace();
    }
};
}
 

  1. 编写接口controller OrderController.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.controller;
import cn.kt.food.orderservicemanager.service.OrderService;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:12
* 概要:
*/
@Slf4j
@RestController
@RequestMapping("api/v1")
public class OrderController {
@Autowired
OrderService orderService;
 
@PostMapping("/orders")
public void createOrder(@RequestBody OrderCreateVO orderCreateDTO) throws IOException, TimeoutException {
    log.info("createOrder:orderCreateDTO:{}", orderCreateDTO);
    orderService.createOrder(orderCreateDTO);
}
}
 

  1. 线程池配置类和自动监听配置 线程池配置类:AsyncTaskConfig.java
代码语言:javascript复制
package cn.kt.food.orderservicemanager.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author tao
* @date 2022-03-24 22:44
* 概要:  线程池配置类
*/
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
 
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
 
@Override
@Bean
public Executor getAsyncExecutor() {
    // 起一个线程池
    ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
    //设置核心线程数
    threadPool.setCorePoolSize(10);
    //设置最大线程数
    threadPool.setMaxPoolSize(100);
    //线程池所使用的缓冲队列
    threadPool.setQueueCapacity(10);
    //等待任务在关机时完成--表明等待所有线程执行完
    threadPool.setWaitForTasksToCompleteOnShutdown(true);
    // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
    threadPool.setAwaitTerminationSeconds(60);
    //  线程名称前缀
    threadPool.setThreadNamePrefix("Rabbit-Async-");
    // 初始化线程
    threadPool.initialize();
    return threadPool;
}
 
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return null;
}
}
 

RabbitMQ需要自动执行并且实时监听,因此需要配置自动执行OrderMessageService中handleMessage方法 RabbitConfig.java

代码语言:javascript复制
package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author tao
 * @date 2022-03-24 22:58
 * 概要:  自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
 */
@Slf4j
@Configuration
public class RabbitConfig {
    @Autowired
    OrderMessageService orderMessageService;
    //配置类中的@Autowired方法会被自动调用
    @Autowired
    public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
        orderMessageService.handleMessage();
    }
}

订单微服务和RabbitMQ的创建大致如上,因此也还有:商家微服务、骑手微服务、结算微服务、积分微服务。 其功能是在订单的每个阶段处理相应的业务逻辑,其中在每个微服务的消息通讯时使用RabbitMQ进行消息的路由和转发,套路和订单微服务差不多一致。

注:其余微服务和总代码放在文章末尾

RabbitMQ使用总结

  1. 新建ConnectionFactory
代码语言:javascript复制
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
 

  1. Channel处理相关配置和使用basicPublish发送消息 注意:channel.basicPublish(exchange,routingKey,消息特殊参数,消息体本身(字节)) RabbitMQ发送的消息体本身是字节
代码语言:javascript复制
try (Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel()) {
       // 业务逻辑
 
       // 发送消息处理
       ObjectMapper objectMapper = new ObjectMapper();
       String messageToSend = objectMapper.writeValueAsString("需要发送的消息");
       //(exchange,routingKey,消息特殊参数,消息体本身(字节))
       channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 

  1. 配置RabbitMQ的Exchange和queue
代码语言:javascript复制
// 声明交换机
channel.exchangeDeclare(
 "exchange.name",
 BuiltinExchangeType.DIRECT,
 true,
 false,
 null);
// 声明消息队列
channel.queueDeclare(
 "queue.name",
 true,
 false,
 false,
 null);
// 队列绑定交换机
channel.queueBind(
 "queue.name",
 "exchange.name",
 "key.name");
 

  1. 使用basicConsume消费消息
代码语言:javascript复制
@Async
public void handleMessage() {
  /**
    * 监听订单状态
    * (队列,是不是ACK,回调函数,消费者标签)
   */
 channel.basicConsume("queue.name", true, deliverCallback, consumerTag -> {
 });
}
 

  1. 定义回调函数 收到消息后进入的回调函数
代码语言:javascript复制
DeliverCallback deliverCallback = (consumerTag, message) -> {
//业务逻辑
};
 

  1. 配置线程池
代码语言:javascript复制
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
 
@Override
@Bean
public Executor getAsyncExecutor() {
    // 起一个线程池
    ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
    //设置核心线程数
    threadPool.setCorePoolSize(10);
    //设置最大线程数
    threadPool.setMaxPoolSize(100);
    //线程池所使用的缓冲队列
    threadPool.setQueueCapacity(10);
    //等待任务在关机时完成--表明等待所有线程执行完
    threadPool.setWaitForTasksToCompleteOnShutdown(true);
    // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
    threadPool.setAwaitTerminationSeconds(60);
    //  线程名称前缀
    threadPool.setThreadNamePrefix("Rabbit-Async-");
    // 初始化线程
    threadPool.initialize();
    return threadPool;
}
 
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return null;
}
}
 

  1. 使用线程池启动basicConsume
代码语言:javascript复制
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
    orderMessageService.handleMessage();
}
 

使用原生RabbitMQ项目中的不足之处

消息真的发出去了吗?

消息发送后,发送端不知道RabbitMQ是否真的收到了消息 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常 需要使用RabbitMQ发送端确认机制,确认消息发送

消息真被路由了吗?

消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃 消息丢弃后,订单处理流程停止,业务异常 需要使用RabbitMQ消息返回机制,确认消息被正确路由

消费端处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK) 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况 需要使用RabbitMQ消费端确认机制,确认消息被正确处理

队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费 大量堆积的消息会给RabbitMQ产生很大的压力 需要使用RabbitMQ消息过期时间,防止消息大量积压

如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃 直接被丢弃的消息,无法对系统运行异常发出警报 需要使用RabbitMQ死信队列,收集过期消息,以供分析

不足之处总结

目前项目急需引入的RabbitMQ新特性: 发送端确认机制 消费端确认机制 消息返回机制 消息过期机制 消费端限流机制 死信队列

解决这些不足之处需要用到RabbitMQ的高级特性。

实际开发中经验及小结

  1. 使用线程池:对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸”
  2. POJO类单一职责 a. 各种POJO数据结构必须单一职责,混用会导致代码混乱 b. PO/DO: (Persistent Object/Data Object)持久对象 c. DTO:(Data Transfer Object)数据传输对象 d. BO:(Business Object)业务对象 e. vo: (View Object)显示层对象

源代码:

https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_1

0 人点赞