MQ是Message Queue的缩写,也就是消息队列。
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
消息队列这种技术主要用在分布式设计当中,其实可以说是一种设计模式。是相对同步系统而言的。同步系统是什么呢?
同步,是当所有的操作都完毕,才会返回结果,比如用户支付,如果是同步的话就是当用户所发起的支付操作,然后只有等支付的这个业务成功然后才给用户返回结果说支付成功,但是分布式的异步就不一样了,
异步的话它并不是等一套操作全部完成才会返回结果。用户发起支付操作,那么这个操作会立马返回给用户您已经支付成功,然后后面具体的支付扣款以及转账数据等等的操作是自己独立完成的。用户发起支付后,我们只要把这个支付的消息告诉后面的操作,说用户支付了,后面的操作逻辑我就不管了,我现在立马给用户返回信息,让用户及时得到反馈。这就是异步。
而消息队列的出现就是为了实现异步的方式,作为交互的中间件。
分布式系统的两种通信方式,一种是直接远程调用,这是一种同步通信的方式,还有一种就是我们的异步通信。
在我们的分布式微服务中是使用的feign客户端的远程调用就是一种同步操作,我们完成整个逻辑的操作的过程中
大概就是这样的调用过程
尝试一下非常规混乱作图。
你像这样的处理是最好进行同步,不然怎么正确返回数据?这样的同步是后端的数据请求的逻辑必要的。
然后异步
在这里的体现的一个就是同步通信必须必须有一个等待的过程,异步则不需要等待,即使的返回消息。当然这里体现在消息队列的这个处理过程。
相比较而言于是可以比较一下异步和同步这个几个特点。
同步的执行效率会比较低,耗费时间,但有利于我们对流程进行控制,避免很多不可掌控的意外情况;
异步的执行效率高,节省时间,但是会占用更多的资源,也不利于我们对进程进行控制
MQ基于异步的方式实现一个解耦的特点,还有异步提速,削峰填谷。
削峰填谷怎么体现呢?你看同步通信。直接到这里处理,但是A系统不能一次处理这么多,于是啊!这里的业务就会出现一个峰值。那么呢!用户也得不到即使的响应。
但是如果我们用到MQ这个中间件,其实这里会起到一个缓冲作用。
但是你要考虑到MQ的优点必然也会引发一些缺点,就是它的稳定性必然会差,数据并不是很安全,不是很好维护,假如MQ挂掉了,岂不是就瘫痪了?还有异步的方式很可能导致处理消息的不同步的情况。这些我们都需要了解到。
当然啊,如果你的系统对数据的安全性要求不是给很高的话,还有就是允许消息的短暂延迟或者不一致用这个也是比较好的,比如海量的日志信息不重要的这些问题都不是很大。
这是目前的常见的一些MQ产品
我们学习RabbitMQ
RabbitMQ初步
官网
在他这个界面学习案例你可以学习一些案例。一共大概由七个模型。我们首先学习第一个。
点进去java看这种图,我们基于这个模型作一个java的练习。
还有一张非常到位的图。
了解几个概要
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据
routingkey是代表的虚拟主机的意思,这里的匹配直接指定名字就行。如果你这里看不懂,可以先看下面的案例代码讲解。
远程服务器配置RabbitMQ
在这之前我们需要下载这个RabbirMQ,我还是在远程服务器上进行部署。
拉取镜像
代码语言:shell复制docker pull rabbitmq:3-management
配置运行
注意这里用户名和密码自己可以指定
-p 15672:15672 这是它的管理平台的界面
-p 5672:5672 消息通信端口
代码语言:shell复制docker run
-e RABBITMQ_DEFAULT_USER=jgdabc
-e RABBITMQ_DEFAULT_PASS=123456
--name mq
--hostname mq1
-p 15672:15672
-p 5672:5672
-d
rabbitmq:3-management
我最后要在我本机测试,所以我将这两端口全部开放
代码语言:shell复制 firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
另一个也要开放,这里不再写了,阿里云的话面板也开放一下端口。
这样就好了。
然后我们访问管理平台。
还是外网地址加上端口。
这里中间有一个登录,登录进来就是这个界面。可以看下。
这就配置好了
HelloWord 简单模式
这个时候你对比着这个图。
生产者消费者是什么呢?你可以在这里简单的理解为发送消息和接收消息的人。或者说是数据。生产者这里产生的数据要经过中间件的操作,然后你就可以对照上图了。
这里需要建立一个连接,连接这里有若干通道,或者说渠道也成,然后Broker,它是用来接收和分发消息的,可以看到里面有很多组件,有交换机,不过这个案例我们并没有使用到,然后不使用的话其实就是直接队列了,可以看到队列也可以有多个,然后呢!消费者也会需要建立连接,连接中也有通道,当然啊这个连接和通道和消费者是不一样的,但是去消息的话我们是需要来自同一队列,所以必然有一个指定。看图说话,就是这么简单。
当然了我们只是先做一个这样简单的模型案例,还是有许多处理逻辑的,我们还没做。
另外啊,下面我们写的代码在后面是不用再写的,因为后面有更简单的封装,之所以还要学习去写一次,因为是可以加深一个理解的。
建立一个空项目,空项目下,创建两个模块,一个作为生产者,一个作为消费者。如下。
这是两个maven模块,当然后面会有一个和springboot的整合,这里就不做了。
来看pom
代码语言:html复制<!-- rabbitmq的依赖 java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
后面运行控制台出现爆红请引入这个依赖,不过这个并不影响数据传输。
代码语言:html复制 <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
<scope>compile</scope>
</dependency>
接下来就是主要的两个类
先看生产者,主要的代码中注释已经无比清楚了。
代码语言:java复制package com.jgdabc.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//完成发送消息
public class Producer_HelloWord {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 最简单的helloword这个案例不需要交换价
// 所以直接创建队列Queue
channel.queueDeclare();
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
// 如果没有一个helloword的队列,则会创建该队列
channel.queueDeclare("hello_word",true,false,false,null);
// 6// 发送消息
// basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws
// 参数说明:1,exchange:交换机名称,简单模式下,交换机默认的,设置参数为空字符串
// 2:routingket:路由名称,要和路由到的队列名称一样,才可以匹配到
// 3:props :配置信息
// 4:body:发送消息数据
String body = "hello rabbicPublish";
channel.basicPublish("","hello_word",null,body.getBytes());
// 释放资源
channel.close();
connection.close();
}
}
提出来部分注释
1 创建连接工厂
2 设置连接参数
3创建连接connection
4 创建Channel
5:本案例不需要交换机,直接创建队列
6:消息发送
这里注意看代码注释就可以了。
再来看消费者,部分代码是一样的,所以再消费者代码基础上做了一些更改
代码语言:java复制package com.jgdabc.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2 设置连接参数
connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
// 3创建连接connection
Connection connection = connectionFactory.newConnection();
// 4 创建Channel
Channel channel = connection.createChannel();
// 最简单的helloword这个案例不需要交换价
// 所以直接创建队列Queue
channel.queueDeclare();
// 发送消息
//5创建队列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Stri
//参数说明: queue :队列名称
// durable : 是否持久化 :当mq 重启数据还在
// exclusive : 是否独占,只能有一个消费者监听这队列
// 当connection关闭时候,是否删除队列
// autoDelete:是否自动删除,当没有Consumer时候,是否自动删除
// 如果没有一个helloword的队列,则会创建该队列
channel.queueDeclare("hello_word",true,false,false,null);
// 接收消息
// basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
// 参数说明
// queue: 队列名称
// autoAck : 是否自动确认
// callback: 回调函数
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当收到消息后会自动执行该方法
// consumerTag:消息表示
// ebvelop:获取一些信息,交换机的信息,路由等等
// properties:配置信息
// body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("consumerTag:" consumerTag);
System.out.println("Exchange:" envelope.getExchange());
System.out.println("RoutingKey:" envelope.getRoutingKey());
System.out.println("properties:" properties);
System.out.println("body:" new String(body));
}
};
channel.basicConsume("hello_word",true,consumer);
//
}
}
注意这里为什么消费者不关闭连接,这里方便监听传输消息就没关闭。关闭了还监听个锤子。
生产者传输一下就自己关闭了。这就像支付,我可以传输过来支付的消息告诉后面的处理,有人消费了,然后后面的事情我就不管了,我也不等你。后面的听到消息就做出处理就好啦,然后客户支付那里我就即使响应您已经支付完成就好了。这就是异步通信。
在上面的注释中,你需要注释这一段。
代码语言:java复制 connectionFactory.setHost("123.56.4.32"); //服务器主机ip公网 默认localhost
connectionFactory.setPort(5672); //消息端口 默认5672
connectionFactory.setVirtualHost("/shabi");//虚拟机 默认/
connectionFactory.setUsername("jgdabc");//用户名 默认guest
connectionFactory.setPassword("123456");//密码 默认guest
5672是我传输消息用的端口,15672是前面的管理面板端口界面。虚拟机的话这里说一下。
你打开这个界面后,点击Admin,就是管理界面这里。默认的这里有一个超级管理员,就是你当初配置创建用户的时候创建的。
他这里每一个用户都有一个虚拟主机,理论是这样,默认的话是/
这里我创建又创建了一个用户和虚拟主机,并赋予权限。
然后点名字进入到这里
查看并创建虚拟主机的地方
另外在这里创建队列,因为我们程序中用到。
没传输钱这里是没什么通道的。
接下来我们运行传输,先打开消费者程序,在运行中,这里我们还没有输出什么。
然后运行生产者
打开消费者的控制台,这里打印了一些参数信息,然后body就是我们生产着那里发送的消息。
然后打开管理界面,观察一下信息
如果网络不稳定的话,可能会出现一些问题。测试的时候要想在管理面板更加直观观察到。你可以先运行生产者几次,我运行了四次
你点击name为hello_word的这个队列,看看里面存了什么
然后运行消费者,你看这四次就都接到了,红色的那个没啥影响。
这里消费者一接收,队列就空了。
很好理解