RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ

2020-12-16 15:55:53 浏览数 (1)

RabbitMQ 的相关概念

RabbitMQ 作为一个消息中间件,整体上采用了生产者与消费者模型,主要负责接收,存储和转发消息。

生产者和消费者

RabbitMQ 从宏观上可以视为

其中:

  • Producer: 生产者,负责创建消息,并将消息发布到 RabbitMQ 中
  • Broker: 消息中间件服务节点
  • Consumer: 消费者负责订阅队列 并从队列上接收消息。

其详细的工作流程可视为:

RabbitMQ 的架构模型

RabbitMQ 的整体架构可以入下图所示

队列

队列是 Rabbit MQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列

交换器

交换器主要负责将生成者消息投递到队列中。

在 RabbitMQ 中,要想使用 交换器将消息头送到正确的队列上,就需要使用 BindingKey 和 RoutingKey。 BindingKey 就是 交换器和队列之间的固定通路,而 RoutingKey 就是消息选择那些通路进行投送的规则。

交换器的类型

  • fanout: 将消息发送到所有与该交换器绑定的队列上
  • deirect: 指定某一条BindingKey,将消息直接发送到队列上
  • topic: 根据自设定的路由规则将消息投送到队列中
  • headers: 不依赖路由键投递消息而是根据消息的内容进行消息投送。

使用 java 连接 RabbitMQ 的简答案例

前期准备

默认情况下 Rabbit MQ 默认的用户名和密码为 “guest”,但是该账户只能通过本地访问,因此需要创建 一个远程访问的用户,并设置权限

代码语言:javascript复制
# 为 RabbitMQ 创建一个新的用户
# 用户名为 root 密码为 root123
rabbitmqctl add_user root root123
# 为 root 用户设置所有权限
rabbitmqctl set_permission -p/ root ".*" ".*" ".*"
# 设置 root 用户为管理员
rabbitmqctl set_user_tags root administrator

生产者与消费的Demo

Step1: 通过 maven 引入相关包

首先需要引入 rabbitmq-client 和 rabbitmq 客户端所依赖的 slf4j 包

代码语言:javascript复制
<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.26</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.26</version>
        <scope>test</scope>
    </dependency>
</dependencies>

生产者相关代码

代码语言:javascript复制
public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.0"; // 服务器所在id即可
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("root123");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 创建一个持久化,非排他的、非自动删除的队列
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        System.out.println(channel.isOpen());
        // 将交换器与队列通过路由键绑定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        channel.close();
        connection.close();
    }
}

Step3: 消费者相关代码

代码语言:javascript复制
public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.0";
    private static final int PORT = 5672;
    public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS,PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("root123");
        Connection connection = factory.newConnection(addresses);
        final Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv: message: "   new String(body));
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

0 人点赞