RabbitMQ系列2 RabbitMQ安装与基础入门

2021-03-04 14:55:11 浏览数 (1)

RabbitMQ

简介:

AMQP(Adcanced Message Queuing Protocol)

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 JMS是JavaEE规范中的一种,类比JDBC 很多消息中间件都实现了JMS规范,例如:ActiveMQ.RabbitMQ官方没有提供JMS实现包,但是开源社区有

AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

RabbitMQ相关概念介绍

概述

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。 RabbitMQ官方地址:http://www.rabbitmq.com/ RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍); 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

相关部件简介

Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。message到达broker的第一站,根据查询表中的routing key,分发到消息队列中去。

Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection

网络连接,比如一个TCP连接,连接Producer/Consumer与Broker。

Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

出于多租户和安全因素设计,虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker

接受和分发消息的应用,表示消息队列服务器实体,RabbitMQ Server 就是Message Broker。

RabbitMQ安装(以下步骤基于centos7)

1. 安装依赖环境

在线安装依赖环境:

代码语言:javascript复制
shell
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c   kernel-devel m4 ncurses-devel tk tc xz

2. 安装Erlang

到官网下载以下文件

代码语言:javascript复制
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
安装命令
代码语言:javascript复制
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
出错情况

如果出现如下错误

说明gblic 版本太低。我们可以查看当前机器的gblic 版本

代码语言:javascript复制
strings /lib64/libc.so.6 | grep GLIBC

当前最高版本2.12,需要2.15.所以需要升级glibc

  • 使用yum更新安装依赖
代码语言:javascript复制
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y

-** 下载rpm包**

代码语言:javascript复制
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &

-** 安装rpm包**

代码语言:javascript复制
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
  • 装完毕后再查看glibc版本,发现glibc版本已经到2.17了
代码语言:javascript复制
strings /lib/libc.so.6 | grep GLIBC

3. 安装RabbitMQ

代码语言:javascript复制
# 安装
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

4.linux相关命令介绍

启动
代码语言:javascript复制
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
配置文件介绍
代码语言:javascript复制
cd /usr/share/doc/rabbitmq-server-3.6.5/

cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

RabbitMQ管理界面安装

开启管理界面

代码语言:javascript复制
rabbitmq-plugins enable rabbitmq_management

修改默认配置信息

代码语言:javascript复制
vim /usr/lib/ra bbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

访问管控台

代码语言:javascript复制
http://IP地址:15672

管控台基本信息介绍

用户的身份级别

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者

添加用户

Virtual Hosts配置

概念

像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

创建Virtual Hosts

设置Virtual Hosts权限 **

RabbitMQ快速入门

基本步骤介绍

1.创建工程 2.添加对应的依赖 3.编写生产者发送消息 4.编写消费者接受消息

1.创建工程

2.添加对应的依赖

代码语言:javascript复制
<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>

3.编写生产者发送消息

代码语言:javascript复制
package com.pjh;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        /*创建连接工厂*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        /*设置RabbitMQ的主机地址,默认为localhost*/
        connectionFactory.setHost("121.196.111.120");
        /*连接端口*/
        connectionFactory.setPort(5672);
        /*虚拟主机名称,默认为/*/
        connectionFactory.setVirtualHost("/demo");
        /*连接用户名*/
        connectionFactory.setUsername("guest");
        /*连接密码*/
        connectionFactory.setPassword("guest");
        /*创建连接*/
        Connection connection = connectionFactory.newConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*声明创建队列*/
        /*
        * 参数介绍
         * 参数1:队列名称,没有改队列就创建一个
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
        * */
        channel.queueDeclare("demo",true,false,false,null);
        /*要发送的消息*/
        String message="Hello Word!!!";
        /*
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("","demo",null,message.getBytes());
        System.out.println("已发送如下消息:");
        System.out.println(message);
        /*关闭资源*/
        channel.close();
        connection.close();
    }
}

4.编写消费者接受消息

代码语言:javascript复制
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        /*获取连接*/
        Connection connection = getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
        * */
        channel.queueDeclare("demo",true, false, false, null);
        /*创建消费者,并设置消息处理*/
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:"   envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:"   envelope.getExchange());
                //消息id
                System.out.println("消息id为:"   envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息为:"   new String(body, "utf-8"));

            }
        };
        //监听消息
        /**
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        String demo = channel.basicConsume("demo", true, defaultConsumer);
        System.out.println(demo);

        //不关闭资源,应该一直监听消息
        channel.close();
        connection.close();

    }
    public static Connection getConnection() throws IOException, TimeoutException {
        /*创建连接工厂*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        /*设置主机地址,默认为本机地址*/
        connectionFactory.setHost("121.196.111.120");
        /*设置连接端口号*/
        connectionFactory.setPort(5672);
        /*设置虚拟主机名称*/
        connectionFactory.setVirtualHost("/demo");
        /*设置连接用户名*/
        connectionFactory.setUsername("guest");
        /*设置连接密码*/
        connectionFactory.setPassword("guest");
        /*创建连接*/
        return connectionFactory.newConnection();
    }
}

相关方法参数介绍

channel.queueDeclare

代码语言:javascript复制
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

1. queue:

队列的名称 ; ** 2. durable:

是否持久化 ;

  • durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重启时该队列就会丢失 ;
  • durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库 ;

** 3. exclusive:

是否排外的 ;

  • exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
  • exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;

注意2点: **

  • 排他队列是 基于连接(Connection) 可见的,同个连接(Connection)的不同管道 (Channel) 是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的 ,强制访问将报错:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.;以下声明是没问题的:
代码语言:javascript复制
Channel channel = connection.createChannel();
    Channel channel2 = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, true, false, null);
    channel2.queueDeclare(QUEUE_NAME, false, true, false, null);
	=》如果是不同的 connection 创建的 channel 和 channel2,那么以上的
	=》channel2.queueDeclare()是会报错的!!!!!!
  • "首次" 是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。

** ** 4. autoDelete:

是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。 ** ** 5. arguments:

设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。

0 人点赞