rabbitMq
中间件
RabbitMQ属于中间件的一种,其实很多东西都是中间件比如说mysql redis都是的 其实中间件是一种概念,只要是实现软件和软件之间沟通连接的软件都可以叫做中间件
为什么使用中间件
举个例子,其实当我们的项目需要的访问量过多的时候,必然要用到分布式微服的相关技术,也就是说将一个项目拆分成多个项目,这种就带来了业务链调用的问题,比如说不同的服务要相互的交互,但是你的业务足够的复杂之后呢,就会带来以下的问题
- 业务调用链过长,用户等待时间长
- 部分组件故障会瘫痪整个业务
- 业务高峰期没有缓存
这些都是同步调用时候会出现的问题,但是呢后面人们又想到了异步调用来解决这些相关的问题,也就是利用多线程相关的技术,但是多线程相关的技术也会带来问题,因为你的线程开的多了之后啊在高峰期你的线程池是不够用的并且内存也会出现爆满等等的问题,所以说靠项目本身来解决这个问题都是不优秀的。这样一来就诞生了消息中间件这种概念
分类
消息中间件 xxxMQ
负载均衡中间件 nginx lv5 keepalive cdn
缓存中间件 mencache redis
数据库中间件mycat shardingjdbc
消息中间件
什么是消息中间件
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
消息中间件使用的大概念
消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中,不同的对象之间通过传递消息来激活对方的事件,完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。消息中间件能在不同平台之间通信,它常被用来屏蔽掉各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这也是它比远程过程调用更进一步的原因。简单的说就是接受数据、接受请求、存储数据、发送数据等功能的技术服务
消息中间件核心组成部分
- 消息的协议
- 消息的持久化策略
- 消息的分发策略
- 消息的高可用
- 消息的容错机制
四大消息中间件
Apache ACTIVEMQ
RabbitMQ
Apache RocketMQ
Apache kafka
简介RabbitMQ
底层用的就是erlang语言编写的目前最流行的中间消息键
优点:
- 高可靠性,支持发送确认,投递确认
- 高可用,支持镜像队列
- 支持插件
- 支持高并发
- 支持多种平台,多种客户端,文档齐全
- 社区活跃度高
- 与spring同宗,支持完善
因为要达到跨语言跨项目交互,肯定是需要协议的。那么我们rabbitMq也是有相应的协议的
我们一般了解到的是tcp/ip协议,还有udp协议但是tcp/ip协议并不能满足足够的需求,所以自己要构建一份协议也就是AMQP协议
三大规则
流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正
常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限
制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分
散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体
验要好。
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合
调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于
消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在
这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流
系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可
以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,
B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,
A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此
消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不
用做这些操作。A 服务还能及时的得到异步处理成功的消息。
协议拓展
什么是协议:
1.计算机底层操作系统与应用程序通讯是共同遵守的一组约定,只有遵循共同约定和规范,系统和底层操作系统之间才能相互的交流
2.主要负责数据的接收与传递
3.协议对数据格式和计算机之间交换数据都必须严格的遵守
协议三要素:
1.语法:语法示用户数据与控制信息的结构与格式,以及数据出现的顺序(http规定请求报文与响应报文的格式)
2.语义:语义是解释控制信息每个部分的意义。他规定了需要发出什么控制信息以及完成动作与做出什么样的响应(post、get等请求)
3.时序:时序是对时间发生顺序的详细说明(先请求再响应)
代码语言:javascript复制TCP/ip协议与UDP协议
TCP/IP协议是一个协议簇。里面包括很多协议的,UDP只是其中的一个, 之所以命名为TCP/IP协议,因为TCP、IP协议是两个很重要的协议,就用他两命名了。
TCP/IP协议集包括应用层,传输层,网络层,网络访问层。
其中应用层包括:
1、超文本传输协议(HTTP):万维网的基本协议;
2、文件传输(TFTP简单文件传输协议);
3、远程登录(Telnet),提供远程访问其它主机功能, 它允许用户登录internet主机,并在这台主机上执行命令;
4、网络管理(SNMP简单网络管理协议),该协议提供了监控网络设备的方法, 以及配置管理,统计信息收集,性能管理及安全管理等;
5、域名系统(DNS),该系统用于在internet中将域名及其公共广播的网络节点转换成IP地址。
其次网络层包括:
1、Internet协议(IP);
2、Internet控制信息协议(ICMP);
3、地址解析协议(ARP);
4、反向地址解析协议(RARP)。
最后说网络访问层:
网络访问层又称作主机到网络层(host-to-network),网络访问层的功能包括IP地址与物理地址硬件的映射, 以及将IP封装成帧.基于不同硬件类型的网络接口,网络访问层定义了和物理介质的连接. 当然我这里说得不够完善,TCP/IP协议本来就是一门学问,每一个分支都是一个很复杂的流程, 但我相信每位学习软件开发的同学都有必要去仔细了解一番。
下面着重讲解一下TCP协议和UDP协议的区别
TCP(Transmission Control Protocol,传输控制协议)是面向连接的协议,也就是说,在收发数据前,必须和对方建立可靠的连接。 一个TCP连接必须要经过三次“对话”才能建立起来,其中的过程非常复杂, 只简单的描述下这三次对话的简单过程:
1)主机A向主机B发出连接请求数据包:“我想给你发数据,可以吗?”,这是第一次对话;
2)主机B向主机A发送同意连接和要求同步 (同步就是两台主机一个在发送,一个在接收,协调工作)的数据包 :“可以,你什么时候发?”,这是第二次对话;
3)主机A再发出一个数据包确认主机B的要求同步:“我现在就发,你接着吧!”, 这是第三次对话。
三次“对话”的目的是使数据包的发送和接收同步, 经过三次“对话”之后,主机A才向主机B正式发送数据。
TCP三次握手过程
第一次握手:主机A通过向主机B 发送一个含有同步序列号的标志位的数据段给主机B,向主机B 请求建立连接,通过这个数据段, 主机A告诉主机B 两件事:我想要和你通信;你可以用哪个序列号作为起始数据段来回应我。
第二次握手:主机B 收到主机A的请求后,用一个带有确认应答(ACK)和同步序列号(SYN)标志位的数据段响应主机A,也告诉主机A两件事:我已经收到你的请求了,你可以传输数据了;你要用那个序列号作为起始数据段来回应我
第三次握手:主机A收到这个数据段后,再发送一个确认应答,确认已收到主机B 的数据段:"我已收到回复,我现在要开始传输实际数据了,这样3次握手就完成了,主机A和主机B 就可以传输数据了。
3次握手的特点
没有应用层的数据 ,SYN这个标志位只有在TCP建立连接时才会被置1 ,握手完成后SYN标志位被置0。
TCP建立连接要进行3次握手,而断开连接要进行4次
第一次: 当主机A完成数据传输后,将控制位FIN置1,提出停止TCP连接的请求 ;
第二次: 主机B收到FIN后对其作出响应,确认这一方向上的TCP连接将关闭,将ACK置1;
第三次: 由B 端再提出反方向的关闭请求,将FIN置1 ;
第四次: 主机A对主机B的请求进行确认,将ACK置1,双方向的关闭结束.。
由TCP的三次握手和四次断开可以看出,TCP使用面向连接的通信方式, 大大提高了数据通信的可靠性,使发送数据端和接收端在数据正式传输前就有了交互, 为数据正式传输打下了可靠的基础。
名词解释
1、ACK 是TCP报头的控制位之一,对数据进行确认。确认由目的端发出, 用它来告诉发送端这个序列号之前的数据段都收到了。 比如确认号为X,则表示前X-1个数据段都收到了,只有当ACK=1时,确认号才有效,当ACK=0时,确认号无效,这时会要求重传数据,保证数据的完整性。
2、SYN 同步序列号,TCP建立连接时将这个位置1。
3、FIN 发送端完成发送任务位,当TCP完成数据传输需要断开时,,提出断开连接的一方将这位置1。
TCP的包头结构:
源端口 16位;
目标端口 16位;
序列号 32位;
回应序号 32位;
TCP头长度 4位;
reserved 6位;
控制代码 6位;
窗口大小 16位;
偏移量 16位;
校验和 16位;
选项 32位(可选);
这样我们得出了TCP包头的最小长度,为20字节。
UDP(User Data Protocol,用户数据报协议)
1、UDP是一个非连接的协议,传输数据之前源端和终端不建立连接, 当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它扔到网络上。 在发送端,UDP传送数据的速度仅仅是受应用程序生成数据的速度、 计算机的能力和传输带宽的限制; 在接收端,UDP把每个消息段放在队列中,应用程序每次从队列中读一个消息段。
2、 由于传输数据不建立连接,因此也就不需要维护连接状态,包括收发状态等, 因此一台服务机可同时向多个客户机传输相同的消息。
3、UDP信息包的标题很短,只有8个字节,相对于TCP的20个字节信息包的额外开销很小。
4、吞吐量不受拥挤控制算法的调节,只受应用软件生成数据的速率、传输带宽、 源端和终端主机性能的限制。
5、UDP使用尽最大努力交付,即不保证可靠交付, 因此主机不需要维持复杂的链接状态表(这里面有许多参数)。
6、UDP是面向报文的。发送方的UDP对应用程序交下来的报文, 在添加首部后就向下交付给IP层。既不拆分,也不合并,而是保留这些报文的边界, 因此,应用程序需要选择合适的报文大小。
我们经常使用“ping”命令来测试两台主机之间TCP/IP通信是否正常, 其实“ping”命令的原理就是向对方主机发送UDP数据包,然后对方主机确认收到数据包, 如果数据包是否到达的消息及时反馈回来,那么网络就是通的。
ping命令是用来探测主机到主机之间是否可通信,如果不能ping到某台主机,表明不能和这台主机建立连接。ping命令是使用 IP 和网络控制信息协议 (ICMP),因而没有涉及到任何传输协议(UDP/TCP) 和应用程序。它发送icmp回送请求消息给目的主机。
ICMP协议规定:目的主机必须返回ICMP回送应答消息给源主机。如果源主机在一定时间内收到应答,则认为主机可达。
UDP的包头结构:
源端口 16位
目的端口 16位
长度 16位
校验和 16位
小结TCP与UDP的区别:
1、基于连接与无连接;
2、对系统资源的要求(TCP较多,UDP少);
3、UDP程序结构较简单;
4、流模式与数据报模式 ;
5、TCP保证数据正确性,UDP可能丢包;
6、TCP保证数据顺序,UDP不保证。
什么是长短连接
在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。
而从HTTP/1.1起,默认使用长连接,用以保持连接特性。使用长连接的HTTP协议,会在响应头加入这行代码:
Connection:keep-alive
在使用长连接的情况下,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的TCP连接不会关闭,客户端再次访问这个服务器时,会继续使用这一条已经建立的连接。Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。实现长连接需要客户端和服务端都支持长连接。
HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。
为什么rm不用http呢
1.http比较复杂,性能损失
2.rm不会造成数据的丢失
AMQP协议
Advanced Message Queuing Protocol 高级消息队列协议,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
消息分发策略
发布订阅 就是先订阅牧某个对象,对象一旦有新消息都会推送个订阅的人 100条消息每个人都会收到100条
轮询分发 就是假设你有100条消息 3个消费者 他会一个一个的发
公平分发 就是假设你有100条消息 3个消费者 谁接受的快谁收到的就相对的多点
重发 如果发给某个对象 对象死机了,就会重新发送给另外一个之前选好的对象
rabbitMq的安装(linux安装)
rabbitmq的安装时十分恶心的,因为他的几个安装包互相之间版本需要对应上,并且资源全是在外网上网速也是十分的慢的
所以安装要十分的费力,而在我们以后的docker学习之后就是一键式的安装了
- 在虚拟机home里建目录rabbitmq
- 将安装包移入
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- yum install socat -y
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
如果你是遇到 useradd :/etc/passwd错误
贴如下代码
chattr -i /etc/passwd
chattr -i /etc/shadow
chattr -a /etc/passwd
chattr -a /etc/shadow
- chkconfig rabbitmq-server on //添加开机启动 RabbitMQ 服务
- /sbin/service rabbitmq-server start //启动服务
- /sbin/service rabbitmq-server status //查看服务状态,看到active(running则运行成功)
- //停止服务 这个不要运行 /sbin/service rabbitmq-server stop
- rabbitmq-plugins enable rabbitmq_management //开启 web 管理插件
防火墙的东西如果你是你就别动,你去开的端口5672、15672、25672就行了
- systemctl stop firewalld //暂时关闭防护墙
- systemctl status firewalld//查看防火墙状态
- systemctl disable firewalld //永久关闭防火墙
- 访问地址 http://xxxxxx:15672/
- rabbitmqctl add_user admin 123 //创建用户账号密码
- rabbitmqctl set_user_tags admin administrator 设置用户角色
- rabbitmqctl set_permissions -p "/" admin "." "." ".*"
- rabbitmqctl list_users //查看当前用户和角色
其它命令
关闭应用
rabbitmqctl stop_app
清除命令
rabbitmqctl reset
重启命令
rabbitmqctl start_app
了解RabbitMQ各类角色描述:
none
不能访问 management plugin
management
用户可以通过AMQP做的任何事外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker
management可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections
rabbitmq工作件概念
队列
队列用于临时存储消息和转发消息。 队列类型有两大类,即时队列和延时队列。 即时队列:队列中的消息会被立即消费; 延时队列:队列中的消息会在指定的时间延时之后被消费
交换机
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。
交换机有四种类型:Direct, topic, Headers and Fanout。 Direct[精确匹配类型/直连]:Direct是RabbitMQ默认的交换机模式,先匹配, 再投送。即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
Topic[模式匹配/主题]:按通配符匹配规则转发消息(最灵活),队列和交换机的绑定主要是依据一种模式(通配符 字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
Headers[键值对匹配]:设置header attribute参数类型的交换机。 消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.这种一般是不用的
Fanout[转发消息最快/扇型]: 路由广播的形式,简单的将队列绑定到交换机上将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
介绍界面使用
添加交换机
加三个,名字准备好
i的意思是RabbitMQ*内部使用,如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
切换到消息队列栏添加四个队列
绑定交换机和队列
点击Exchange的Name可进入到Exchange的详情页面,在里面将Exchange与Queue进行Binding
绑定结果
开始测试
测试zs.direct(点对点)点进交换器之后有一栏是Publish message,我们发送消息就是在这里进行发送的:
填写好之后,点击Publish message发送。
发送成功后切换到Queues查看是否收到消息,很显然 ls 收到了一条消息,点击 ls 进去查看
进来之后,展开Get message选项,点击GetMessage(s)就能得到消息了
下面是得到的之前发送过来的消息
测试zs.fanout(广播)
同上面测试一样,发送一条消息给 ls
但是查看消息队列时会发现,zs.fanout下的所有消息队列都接收到了
但是,在查看ls所收到的消息时,始终都是之前那一条:
由于消息获取来之后并没有给消息队列进行应答,将接收的消息删除。解决办法就是把Ack Mode改成第二个选项就可以了。
测试zs.topic(发布订阅)
消息队列中与test.news匹配上的有:ls.news、ww.news,所以它们是会多收到一条消息的
下面是收到的ls.news中收到的来自zs.topic的消息
###
RabbitMQ五种工作模式(代码版---一般我们也是用代码的)
简单队列 (不需要设置交换机默认Direct)
一个生产者一个消费者
在rabbit管理界面将admin栏将 can access virtual host 变成/ 一般默认就是/ 如果不是怎么变呢 先点用户名再点set permissions
创建maven项目
导入依赖
代码语言:javascript复制 <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
创建until包创建RabbitMqUtils工具类
代码语言:javascript复制package kj08.until;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static final String HOST="192.168.1.131";
public static final String USERNAME="admin";
public static final String PASSOWRD="123";
public static Channel getChannel() throws Exception {
ConnectionFactory factory = new ConnectionFactory();//获取连接工厂
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSOWRD);
Connection connection = factory.newConnection();//创建链接
Channel channel = connection.createChannel();//根据链接创建对象
return channel;
}
}
创建work包
创建消费者生产者类
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.Channel;
import kj08.until.RabbitMqUtils;
import java.util.Scanner;
public class Provider {
public static final String QUEUE="work";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**Queue.DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments) throws IOException;
*queue:队列名
*durable:持久化,true就是队列持久化
*exclusive:排他性1.只对首次声明它的连接(Connection)可见;2.会在其连接断开的时候自动删除。
*autoDelete:自动删除 队列中的数据消费完成后是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除
*arguments:参数 后续有说法的,后面再说
*/
channel.queueDeclare(QUEUE,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
/**
* void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)throws IOException;
*exchange:交换器名称
*routingKey::路由键
*props:一般用 MessageProperties的静态常量做参数载体
*body:消息体
*/
channel.basicPublish("",QUEUE, null,message.getBytes());
System.out.println("已发送信息:" message);
}
}
}
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import kj08.until.RabbitMqUtils;
import java.io.IOException;
public class Consumer {
public static final String QUEUE="work";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("接收到消息:" new String(delivery.getBody()));
}
};
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s "取消回调");
}
};
/**
* String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
*方法描述:启动一个消费者,并返回服务端生成的消费者标识
*queue:队列名
*autoAck:是否自动应答
*deliverCallback:接受回调函数
*CancelCallback":取消回调函数
*return:服务端生成的消费者标识
*/
String s = channel.basicConsume(QUEUE, true, deliverCallback, cancelCallback);
}
}
运行
收到消息成功,但是这只是对一个信道的
我们接下来准备一个消费者对一个队列对两个信道也就是公共模式
work模式(不需要设置交换机默认Direct)
一个生产者对应着多个消费者,但是一条消息只有一个消费者能获得消息
在新建一个消费者
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import kj08.until.RabbitMqUtils;
import java.io.IOException;
public class Consumer2 {
public static final String QUEUE="work";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("接收到消息:" new String(delivery.getBody()));
}
};
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s "取消回调");
}
};
/**
* String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
*方法描述:启动一个消费者,并返回服务端生成的消费者标识
*queue:队列名
*autoAck:是否自动应答
*deliverCallback:接受回调函数
*CancelCallback":取消回调函数
*return:服务端生成的消费者标识
*/
String s = channel.basicConsume(QUEUE, true, deliverCallback, cancelCallback);
}
}
生产者再发消息,你会发现两个消费者轮流着接收到了消息
但是大家注意在我们的消费者这一端channel.basicConsume方法中有一个自动应答机制,我们给他设置了true,这个是我们需要注意到的,一般有自动的应答机制,消息发送后就自动应答,rm接受到自动的应答就会把这条消息删除了,但是有时候我们想,当我们的生产者发送了消息,但是我们的消费者拒绝了这个消息或者是报错了并没有很好的处理这个消息,需要重新发送给其他的消费者等等情况需要我们改变应答方式,所以我们一般用到的就是手动应达
官方的自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
如何使用手动应答
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import kj08.until.RabbitMqUtils;
import java.io.IOException;
public class Consumer2 {
public static final String QUEUE="work";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("接收到消息:" new String(delivery.getBody()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(delivery.getEnvelope().getDeliveryTag());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s "取消回调");
}
};
/**
* String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
*方法描述:启动一个消费者,并返回服务端生成的消费者标识
*queue:队列名
*autoAck:是否自动应答
*deliverCallback:接受回调函数
*CancelCallback":取消回调函数
*return:服务端生成的消费者标识
*/
channel.basicConsume(QUEUE, false, deliverCallback, cancelCallback);
}
}
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
小提示:注意当前的是一个消费者 一个交换机 一个队列 多个消费者channel
rm持久化问题
正如其他第三方所带来的的问题一样,rabbitmq也会有持久化的问题,比如说我发给rm一个消息,还没被消费呢,rm挂了,那么这个就是消息丢失了,就这一个消息丢失不丢失的问题需要我们好好的去研究,首先是我们要发布确认也就是说我们生产者发布给rm后rm要跟我们说他收到了,不然我就要重新发送。然后消息在rm内部必须的做持久化,分为两部分 一个是队列持久化一个是消息持久化,然后确认完整消费还要有我们的手动应答机制,而且我们还要考虑这个我们消费者处理完了然后网络断开了,会被进行重复消费的问题,这种问题也是我们要考虑的,其中就用到了我们幂等性相关的值是点,其实也就是在redis或者是mysql当中去给他这个唯一性的消息的状态做一个验证,也就是说消费过的就不在消费了,其实我们学习rm主要的就是学这个持久化和工作模式
队列持久化
队列持久化其实十分的简单,就是在声明队列的时候将第二个参数变成持久化也就是true;
代码语言:javascript复制 channel.queueDeclare(QUEUE,true,false,false,null);//durable
注意,队列的持久化并不是刻意随便的切换的,定义了就不能变了,如果之前定义了是未持久化的话,那么就要删了才能重新定义持久化了,当我们的队列变成了持久化在我们管理页面会出现features(特征) 下面有个蓝色的D
消息实现持久化
其实也是很简单,在发布消息的时候,在BasicProperties那一格参数赋值MessageProperties.PERSISTENT_TEXT_PLAIN
代码语言:javascript复制 channel.basicPublish("",QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
公平分发
相对于我们的轮询分发,公平分发其实就是能者多劳的感觉,怎么玩的呢其实也就是在消费这那边获取消息之前设置channel.basicQos(1);就完事了
代码语言:javascript复制 Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("接收到消息:" new String(delivery.getBody()));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(delivery.getEnvelope().getDeliveryTag());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
CancelCallback cancelCallback=new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(s "取消回调");
}
};
/**
* String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
*方法描述:启动一个消费者,并返回服务端生成的消费者标识
*queue:队列名
*autoAck:是否自动应答
*deliverCallback:接受回调函数
*CancelCallback":取消回调函数
*return:服务端生成的消费者标识
*/
channel.basicQos(1);//公平分发
channel.basicConsume(QUEUE, false, deliverCallback, cancelCallback);
channel.basicQos(int i);他这个方法里面的int其实是Prefetch count 这个东西是什么意思呢这个就是预取值的意思,也就是预取值为0就是轮询 为1就是公平,还有其他的数量怎么算呢,这样我们就要了解什么是预取值了
预取值
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的
发布确认
单个发布确认
发一个确认一个 主要是channel.confirmSelect();开起确认 channel.waitForConfirms()获取确认结果
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.Channel;
import kj08.until.RabbitMqUtils;
import java.util.Scanner;
public class Provider {
public static final String QUEUE="work";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**Queue.DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments) throws IOException;
*queue:队列名
*durable:持久化,true就是队列持久化
*exclusive:排他性1.只对首次声明它的连接(Connection)可见;2.会在其连接断开的时候自动删除。
*autoDelete:自动删除 队列中的数据消费完成后是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除
*arguments:参数 后续有说法的,后面再说
*/
channel.queueDeclare(QUEUE,false,false,false,null);
channel.confirmSelect();//开启发布确认
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
/**
* void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)throws IOException;
*exchange:交换器名称
*routingKey::路由键
*props:一般用 MessageProperties的静态常量做参数载体
*body:消息体
*/
channel.basicPublish("",QUEUE, null,message.getBytes());
boolean b = channel.waitForConfirms();
if(!b){
channel.basicPublish("",QUEUE, null,message.getBytes());
}else{
System.out.println("消息发送成功");
}
System.out.println("已发送信息:" message);
}
}
}
批量确认发布
批量发布,就是批量之后再调 channel.waitForConfirms()方法
代码语言:javascript复制public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i ) {
String message = i "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount ;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" MESSAGE_COUNT "个批量确认消息,耗时" (end - begin)
"ms");
} }
异步确认发布
代码复杂,性能优越
代码语言:javascript复制package kj08.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;
import kj08.until.RabbitMqUtils;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class Consumer3 {
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(new ConfirmListener() {
//消息失败处理
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//deliveryTag;唯一消息标签
//multiple:是否批量
System.err.println("-------no ack!-----------");
}
//消息成功处理
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i ) {
String message = "消息" i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" 1000 "个异步确认消息,耗时" (end - begin)
"ms");
} }
public static void main(String[] args) throws Exception {
publishMessageAsync();
}
}
单独发布消息:同步等待确认,简单,但吞吐量非常有限。
批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
发布订阅模式(需要设置交换机 交换机的类型是fanout)
一个消费者将消息发到交换机 交换机绑到多个队列,然后被监听该队列的消费者所接收并且消费
无名交换机
之前我们玩rm都没有设置到交换机,都是声明一个队列的,那就没有交换机了吗,其实不是的,我们再推送的时候
代码语言:javascript复制 channel.basicPublish("", queueName, null, message.getBytes());
第一个参数就是填的交换机名称,只不过我们填的是空值,他就是代表着默认或者无名交换机,
临时队列
我们在开发中有些情况要求我们创建一个随机名称的队列,然后断开消费者连接,队列都没有了。这种队列就是临时队列。
创建方式
代码语言:javascript复制String queueName = channel.queueDeclare().getQueue();
创建出来之后你在管理页面看到的那个页面名称就是很长的一段uuid的感觉的字符串 ,类型是经典的 特征是 自动死亡 排他 状态 running:运行中;idle:空闲。
绑定(bindings)
就是将交换机和队列绑定到一起的
fanout
也叫广播模式。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
所有队列中
代码语言:javascript复制package kj08.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("fanoutex","fanout");//声明交换机
String queue = channel.queueDeclare().getQueue();//声明临时队列
channel.queueBind(queue,"fanoutex","");//绑定
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}
}
代码语言:javascript复制package kj08.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("fanoutex","fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"fanoutex","");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}
}
代码语言:javascript复制package kj08.fanout;
import com.rabbitmq.client.Channel;
import kj08.until.RabbitMqUtils;
import java.util.Scanner;
public class provider {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("fanoutex", "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" message);
}
}
}
路由模式(需要设置交换机 交换机的类型是Direct)
生产者将消息发到交换机,在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定路由key,那么消息只会发送到相应的key的队列,接着监听该队列的消费者消费信息人话让消费者有选择的接收消息
代码语言:javascript复制package kj08.direct;
import com.rabbitmq.client.Channel;
import kj08.until.RabbitMqUtils;
import java.util.Scanner;
public class provider {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("directex", "d2", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" message);
}
}
}
代码语言:javascript复制package kj08.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("directex", BuiltinExchangeType.DIRECT);//声明交换机
String queue = channel.queueDeclare().getQueue();//声明临时队列
channel.queueBind(queue,"directex","d1");//绑定
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}
}
代码语言:javascript复制package kj08.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("directex", BuiltinExchangeType.DIRECT);//声明交换机
String queue = channel.queueDeclare().getQueue();//声明临时队列
channel.queueBind(queue,"directex","d2");//绑定
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}
}
主要就是路由key的不同
主题模式(需要设置交换机 交换机的类型是topic)
路由器不根据完整的key进行匹配,根据类似于通配符的方式进行匹配
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)以代替一个单词
#(井号)可以替代零个或多个单词
有点模糊匹配的感觉
代码语言:javascript复制public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" message);
}
}
} }
代码语言:javascript复制public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q1 队列与绑定关系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :" queueName " 绑 定
键:" delivery.getEnvelope().getRoutingKey() ",消息:" message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
} }
代码语言:javascript复制public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q2 队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :" queueName " 绑 定
键:" delivery.getEnvelope().getRoutingKey() ",消息:" message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
} }
死信队列
DLX,dead-letter-exchange
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的场景
注意两个交换机,两个队列了
消息 TTL 过期
代码语言:javascript复制package kj08.dl;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("normalex", BuiltinExchangeType.DIRECT);//声明交换机
Map<String,Object> params= new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", "deadex");
// params.put("x-message-ttl", 1000);//过期时间 毫秒//一般在生产者设置
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "si");
channel.queueDeclare("normalq",false,false,false,params);
channel.queueBind("normalq","normalex","huo");//绑定
channel.exchangeDeclare("deadex",BuiltinExchangeType.DIRECT);
channel.queueDeclare("deadq",false,false,false,null);
channel.queueBind("deadq","deadex","si");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume("normalq", true, deliverCallback, consumerTag -> { });
}
}
代码语言:javascript复制package kj08.dl;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import kj08.until.RabbitMqUtils;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare("deadex", BuiltinExchangeType.DIRECT);
channel.queueDeclare( "deadq", false, false, false, null);
channel.queueBind( "deadq", "deadex", "si");
System.out.println("等待接收死信队列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume("deadq", true, deliverCallback, consumerTag -> { });
}
}
代码语言:javascript复制package kj08.dl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import kj08.until.RabbitMqUtils;
import java.util.Scanner;
public class provider {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.expiration("10000").
build();//设置超时时间
for (int i = 0; i <10 ; i ) {
String message =String.valueOf(i);
channel.basicPublish("normalex", "huo",properties, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" message);
}
}
}
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
很简单 就是在消费者1中map添加一对
params.put("x-max-length",10);
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
代码语言:javascript复制DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if(message.equals("5")){
System.out.println("Consumer01 接收到消息" message "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息" message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
延迟队列
概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
TimeToLive
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
TTL设置
消息设置
队列设置
两者区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。但是!有问题,用队列延时就会无限的加队列,用消息延时其实也保证不了按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。所以一般就用到插件rabbitmq_delayed_message_exchange 。
安装插件
进入我们方rm软件的文件夹
1.cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
2.cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
3.rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4.rabbitmqctl start_app
接下来我们学习用springboot操作这个延时队列
建spingboot项目
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>kj08</groupId>
<artifactId>rmdelay</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rmdelay</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
yml
代码语言:javascript复制spring:
rabbitmq:
host: 192.168.1.131
port: 5672
username: admin
password: 123
配置类
代码语言:javascript复制package kj08.rmdelay.conf;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
controller
代码语言:javascript复制package kj08.rmdelay.Controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
public class RmController {
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new
Date(),delayTime, message);
}
}
监听
代码语言:javascript复制package kj08.rmdelay.consume;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class RmConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}