分布式--RabbitMQ入门

2022-09-19 15:41:25 浏览数 (1)

一、简介

1. AMQP

分布式项目中,模块与模块之间的通信可以使用RPC框架,如Dubbo,但RPC中调用方模块获取到被调用方的结果是同步的,争对一些只需要异步调用的方法,如日志存储、发送消息等,RPC就显得效率低下了,AMQP协议的推出就是用来解决进程之间的异步消息通信

AMQP

从设计上来说,AMQP就是一个发布订阅者模式,整体可以看作一个流,核心是中间的管道,即消息队列 有了AMQP,发布者只需要关注发布消息,订阅者只需要关注订阅消息,而流的速度、总承载量等都交由AMQP管理,从而做到异步调用、削峰填谷、服务解耦

2. RabbitMQ

RabbitMQ也是实现了AMQP的一种消息中间件,由Erlang编写,由于Erlang语言对并发的特性,RabbitMQ相对于其他MQ(kafka、RocketMQ等),延迟最低

二、安装

1. Erlang安装

安装编译Erlang所使用的工具:

代码语言:javascript复制
yum -y install make gcc gcc-c   kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

下载解压Erlang源码,erlang安装版本需要和rabbitmq对应,查看网址:https://www.rabbitmq.com/which-erlang.html#compatibility-matrix

代码语言:javascript复制
wget http://erlang.org/download/otp_src_23.2.tar.gz
tar -xvf otp_src_23.2.tar.gz

编译:

代码语言:javascript复制
cd otp_src_23.2
mkdir -p /usr/local/erlang
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

安装:

代码语言:javascript复制
make install

配置全局环境变量:

代码语言:javascript复制
vi /etc/profile
代码语言:javascript复制
export ERLANGROOT=/usr/local/erlang
export PATH=$PATH:$ERLANGROOT/bin
2. RabbitMQ安装

争对Erlang版本为23.2,我们安装RabbitMQ的3.8.35版本,其他版本可以从githbu上查看:https://github.com/rabbitmq/rabbitmq-server/tags

下载并解压:

代码语言:javascript复制
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.35/rabbitmq-server-generic-unix-3.8.35.tar.xz
mkdir /usr/local/rabbitmq
tar -xvf rabbitmq-server-generic-unix-3.8.35.tar.xz -C /usr/local/rabbitmq/

生效界面插件:

代码语言:javascript复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_management

启动RabbitMQ:

代码语言:javascript复制
./rabbitmq-server -detached

停止RabbitMQ:

代码语言:javascript复制
./rabbitmqctl stop_app

访问15672端口:

三、账号管理

管理界面可以在本地使用guest账号(密码与账号相同)登录,但外部访问是不允许的,需要为RabbitMQ添加账号

1. 创建账号
代码语言:javascript复制
./rabbitmqctl add_user aruba aruba
2. 授予角色
代码语言:javascript复制
./rabbitmqctl set_user_tags aruba administrator
3. 账号授权
代码语言:javascript复制
./rabbitmqctl set_permissions -p "/" aruba ".*" ".*" ".*"

"/"对应默认的虚拟主机

4. 登录

使用新的账号进行登录:

四、RabbitMQ架构

下面是RabbitMQ实现原理的细节部分

针对上面名词的解释:

名词

解释

Exchange 交换机

接收发布者消息,并将消息通过路由规则路由到相应队列

Binding key

消息队列和交换器之间的关联,通过路由键Routing-key绑定

Channel 信道

TCP里面的虚拟链接。发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。由于TCP连接数有上限,且必须通过三次握手,建立连接时性能低下,相比于每个连接都使用TCP,一条TCP连接可以容纳无限的信道

Virtual Host

虚拟主机。表示一批交换器,消息队列和相关对象。默认创建一个为"/"的虚拟主机

我们主要关注RabbitMQ如何发送消息和订阅消费消息,首先RabbitMQ服务中交换机和队列通过Routing-key进行关联,再由发布者与RabbitMQ服务通过Channel建立连接后,通过Routing-key发送消息到交换机,最后由交换机对消息进行路由匹配到相应队列后,由队列进行分发 而订阅消费消息,一个消息只能有一个消费者成功的消费,收到消息后需要发送ack,告诉RabbitMQ服务成功的消费了该消息,针对多个消费者订阅一个队列的情况,RabbitMQ默认使用轮询的方式发送给不同的消费者

五、RabbitMQ通讯方式

Rabbit提供的通讯方式,可以从官网查看:https://rabbitmq.com/getstarted.html 分别为:

通讯方式

描述

Hello World!

为了入门操作,使用默认交换机,一个队列被一个消费者订阅

Work queues

使用默认交换机,一个队列可以被多个消费者订阅

Publish/Subscribe

手动创建交换机(FANOUT),一个消息可以路由到多个队列中

Routing

手动创建交换机(DIRECT),按照routing-key进行路由匹配

Topics

手动创建交换机(TOPIC),routing-key支持通配符匹配

RPC

RPC方式

Publisher Confirms

保证消息可靠性

创建一个项目,导入依赖:

代码语言:javascript复制
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

连接工具类:

代码语言:javascript复制
public class RBConnectionUtil {
    public static final String RABBITMQ_HOST = "192.168.42.4";

    public static final int RABBITMQ_PORT = 5672;

    public static final String RABBITMQ_USERNAME = "guest";

    public static final String RABBITMQ_PASSWORD = "guest";
    // 虚拟主机
    public static final String RABBITMQ_VIRTUAL_HOST = "/";

    /**
     * 构建RabbitMQ的连接对象
     *
     * @return
     */
    public static Connection getConnection() throws Exception {
        //1. 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置RabbitMQ的连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

        //3. 返回连接对象
        Connection connection = factory.newConnection();
        return connection;
    }
    
}
1. Hello World!

Hello World!使用的是默认的交换机

生产者:

代码语言:javascript复制
public class Publisher {

    private static final String QUEUE_NAME = "hello";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 发送消息
        String message = "hello rabbit";
        //参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }

}

运行后,管理界面可以看到队列中有了一条消息:

消费者:

代码语言:javascript复制
public class Consumer {

    private static final String QUEUE_NAME = "hello";

    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }
    
}

运行结果:

2. Work queues

Work queues也是使用默认交换机,发送消息是一样的,只不过一个队列被两个消费者订阅

2.1 轮询消费

上面提到过,一个消息只能被一个消费者消费,RabbitMQ默认使用轮询方式分发给不同的消费者

生产者:

代码语言:javascript复制
public class Publisher {

    private static final String QUEUE_NAME = "work";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 发送消息
        for (int i = 0; i < 10; i  ) {
            //参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数  消息
            channel.basicPublish("", QUEUE_NAME, null, String.valueOf(i).getBytes());
        }
    }

}

消费者:

代码语言:javascript复制
public class Consumer {

    private static final String QUEUE_NAME = "work";

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }

}

运行结果:

2.2 饿汉消费

针对上面的方式,每个消费者不管性能如何,都会按照轮询方式进行分发,如果消费者1消费一个消息需要200ms消费者2消费一个消息需要1000ms,消费完10个消息最少需要5s,那么如何解决这个问题?

手动ack 消息流控:自动ack当接收到消息时就会发送ack给RabbitMQ服务,我们需要在执行完逻辑处理后,手动执行ack,还需要设置消息流控(一次拿多少条消息),才能实现争抢消息

消费者:

代码语言:javascript复制
public class Consumer {

    private static final String QUEUE_NAME = "work";

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.1 流控设置
        channel.basicQos(1);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));
                //手动ack  参数:deliveryTag 是否批量操作
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, false, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.1 流控设置
        channel.basicQos(1);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));
                //手动ack  参数:deliveryTag 是否批量操作
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, false, callback);

        System.in.read();
    }

}

运行结果:

3. Publish/Subscribe

通过Work queues我们知道了一条消息只能被一个消费者消费,而实际开发中,一条消息需要被多个消费者消费的情况很多 Publish/Subscribe就是为了解决这个问题而产生的,队列中的一条消息只能被一个消费者消费,而不同队列中可以存放相同的消息,Publish/Subscribe使得将一条消息路由到多个队列,进而被多个消费者订阅消费

Publish/Subscribe需要手动创建交换机(FANOUT),并手动绑定多个队列

生产者:

代码语言:javascript复制
public class Publisher {

    public static final String QUEUE_NAME1 = "p1";
    public static final String QUEUE_NAME2 = "p2";
    public static final String EXCHANGE_NAME = "ps";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key(直接绑定为空即可)
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");

        //6. 发送消息
        //参数: 交换机 routing-Key(直接绑定不需要) 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "", null, "publish/subscribe!".getBytes());
    }

}

消费者:

代码语言:javascript复制
public class Consumer {

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);

        System.in.read();
    }

}

运行结果:

4. Routing

Routing不仅需要手动创建交换机(DIRECT),还需要在绑定队列时指定routing-key,消息的发送是根据routing-key来路由到不同的队列

生产者:

代码语言:javascript复制
public class Publisher {

    public static final String QUEUE_NAME1 = "q1";
    public static final String QUEUE_NAME2 = "q2";
    public static final String EXCHANGE_NAME = "routing";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "a");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "b");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c");

        //6. 发送消息
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "a", null, "hi a".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "b", null, "hi b".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "c", null, "hi c".getBytes());
    }

}

消费者和Publish/Subscribe的代码相同:

代码语言:javascript复制
public class Consumer {

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2获取到消息:"   new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);

        System.in.read();
    }

}

运行结果:

5. Topic

Topic的路由规则可以通过通配符进行匹配

  • *:表示占位符
  • #:表示通配符

生产者:

代码语言:javascript复制
public class Publisher {

    public static final String QUEUE_NAME1 = "topic-q1";
    public static final String QUEUE_NAME2 = "topic-q2";
    public static final String EXCHANGE_NAME = "topic";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.a.*");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.b");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c.#");

        //6. 发送消息
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "1.a.3", null, "hi a".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "2.b", null, "hi b".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "c.1.2.3", null, "hi c".getBytes());
    }

}

消费者代码相同,运行结果:

6. RPC

RPC方式就是生产者发送消息时附带一些信息,消费者消费时,又通过另一个队列发送返回信息给生产者

生产者不仅仅要发送消息,还要订阅另一个队列:

代码语言:javascript复制
public class Publisher {

    public static final String QUEUE_PUB = "topic-publisher";
    public static final String QUEUE_CON = "topic-consumer";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_PUB, false, false, false, null);
        channel.queueDeclare(QUEUE_CON, false, false, false, null);

        //4. 监听消费者发送消息
        channel.basicConsume(QUEUE_CON, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者发送的消息:"   new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        //5. 发送消息
        // 构建额外的信息
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .replyTo(QUEUE_CON)
                .correlationId(UUID.randomUUID().toString())
                .build();
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish("", QUEUE_PUB, props, "hi consumer".getBytes());

        System.in.read();
    }

}

消费者除了订阅消息外,还需要做相应的返回消息处理:

代码语言:javascript复制
public class Consumer {

    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_PUB, false, false, false, null);
        channel.queueDeclare(Publisher.QUEUE_CON, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:"   new String(body, "UTF-8"));

                //5. 开始返回消息
                String respQueueName = properties.getReplyTo();
                AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", respQueueName, props, "hi publisher".getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_PUB, false, callback);

        System.in.read();
    }

}

运行结果:

项目地址:

https://gitee.com/aruba/rabbit-mqstudy

0 人点赞