前言:
看此文章前,请先观看:https://cloud.tencent.com/developer/article/2311139
介绍
RabbitMQ和WebSocket技术相似,不过目前WebSocket似乎只用于即时通讯功能上,而RabbitMQ不仅可以用于即时通讯,还有更高级的玩法
本文章只讲解,使用几种交换机来发送信息
RabbitMQ分为发送消息端和接受消息端
整个工作流程:
发送端:创建连接-创建通道-声明队列-发送消息
接受端:创建连接-创建通道-声明队列-监听队列-接收消息-ack回复
交换模式
RabbitMQ共有6个交换机模式,其实有7种,有一种就是不用交换机,系统会默认使用DIRECT交换机模式,本章不讲解5和6模式,常用模式有Topics
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics(常用)
5、Header
6、RPC
准备工作
依赖
代码语言:javascript复制<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
工具类-MyConnection用于连接到你机器上的RabbitMQ
代码语言:javascript复制package com.zb.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MyConnection {
public Connection getConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();
}
}
一.玩法使用
启动顺序都是,先启监听端(接受端),后发送端
1.默认模式(DIRECT)-单个信息
说明:默认模式仅仅只是发送单个消息
实际业务:
排队样子像前台用户一个个发信息
发送端:
代码语言:javascript复制package com.zb.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接
MyConnection myConnection = new MyConnection();
//获取连接
Connection connection = myConnection.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("hello_queue", true, false, false, null);
//设置要发送的数据
String msg = "这是一个简单模式,没有写交换机代码,使用系统默认交换机";
//启动交换机,发送数据 (简单模式使用默认交换机,所以参1无需填写)
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
channel.basicPublish("", "hello_queue", null, msg.getBytes());
System.out.println("发送完毕");
}
}
接受端:
代码语言:javascript复制package com.zb.hello;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
System.out.println("开启监听...");
//创建连接
MyConnection myConnection = new MyConnection();
//获取连接
Connection connection = myConnection.getConnection();
//创建通道
Channel channel = connection.createChannel();
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("hello_queue",true,defaultConsumer);
}
}
2.轮询模式(Work queues)- 多个消息(DIRECT)
说明:当一次发送10个消息时,有2个接受端,系统会自动使用轮询模式
轮询:你1-我2-你3-我4-你5-我6…
实际业务:
将一个消息发给前台用户,轮询的模式发送
发送端:
代码语言:javascript复制package com.zb.woker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
import java.util.Date;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("worker_queue", true, false, false, null);
//循环发送10个数据
for (int i = 0; i < 10; i ) {
String msg = "hello第:" i;
//发送
channel.basicPublish("", "worker_queue", null, msg.getBytes());
}
System.out.println("发送完毕");
}
}
1号接受端:
代码语言:javascript复制package com.zb.woker;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听1");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("worker_queue",true,defaultConsumer);
}
}
2号接受端:
代码和1号接受端一致
3.固定模式(Routing)-多个消息(DIRECT)
说明:发送多个消息,但指定只给小明和小美发
固定:小明和小美都可以接受到信息,其它人无法接受到
实际业务:
选择性只给VIP大客户用户发信息
发送端:
代码语言:javascript复制package com.zb.routings;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//指定交换机,这里使用:模糊名字发送交换机(TOPIC)
// 参1:给交换机起的名字
// 参2:指定交换机模式
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("direct_email", true, false, false, null);
channel.queueDeclare("direct_sms", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
channel.queueBind("direct_email", "routing_exchange", "email_routing");
channel.queueBind("direct_sms", "routing_exchange", "sms_routing");
//设置要发送的数据
String strEmail = "123@qq.com";
String strPhone = "13111111111";
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
channel.basicPublish("routing_exchange", "email_routing", null, strEmail.getBytes());
channel.basicPublish("routing_exchange", "sms_routing", null, strPhone.getBytes());
}
}
1号接受端
代码语言:javascript复制package com.zb.routings;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class EmailConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("direct_email",true,defaultConsumer);
}
}
2号接受端:
只需要改动1号接受端的最后一行
改为:
代码语言:javascript复制channel.basicConsume("direct_sms",true,defaultConsumer);
4.订阅模式/广播模式(Publish/Subscribe)-多个消息(FANOUT)
说明:发送多个消息,类似短视频里面,你关注它,它发视频,你是它的粉丝,它会主动通知你我发新作品了
实际业务:
平台邀请用户玩新东西,用户同意了,就接受平台发的信息
FANOUT交换机:
优点:它转发消息最快
缺点:丢失消息,因为是广播的,一旦错过,那么就永久丢失了,就想看新闻联播一样,错过了7点,那么就只能看重播了,但是不会重播
勇敢一点,珍惜眼前人
发送端:
代码语言:javascript复制package com.zb.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
public class Producer {
public static void main(String[] args) throws Exception {
//Publish/Subscribe工作模式,
//发布订阅模式,又叫广播模式
//类似短视频里面,你关注它,它发视频,你是它的粉丝,你可以看见
//使用FANOUT交换机
//优点:它转发消息最快
//缺点:丢失消息,因为是广播的,一旦错过,那么就丢失了
//就想看新闻联播一样,错过了7点,那么就只能看重播了,但是不会重播
System.out.println("开始发送数据...");
//创建连接
MyConnection myConnection = new MyConnection();
//获取连接
Connection connection = myConnection.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机,这里使用:FANOUT交换机模式
// 参1:给交换机起的名字
// 参2:指定交换机模式
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
//------------------------------------------------------------------------------------
//这里就比如:一个人把视频发到了快手和抖音两个平台
channel.queueDeclare("fanout_kuaishou", true, false, false, null);
channel.queueDeclare("fanout_douyin", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
//---------------------------------------------------
//因为FANOUT交换机,是发布订阅/广播模式,所以参3无需起名字
channel.queueBind("fanout_kuaishou", "fanout_exchange", "");
channel.queueBind("fanout_douyin", "fanout_exchange", "");
//设置要发送的数据
String msg = "xxx发了一个新作品,快来看看吧";
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
//------------------------------------------------------------------------------------------
//因为FANOUT交换机,是发布订阅/广播模式,参2无需携带
channel.basicPublish("fanout_exchange", "", null, msg.getBytes());
System.out.println("发送成功");
}
}
1号接受端:
代码语言:javascript复制package com.zb.fanout;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class KuaiShouConsumer {
public static void main(String[] args) throws Exception {
System.out.println("开启监听...");
//创建连接
MyConnection myConnection = new MyConnection();
//获取连接
Connection connection = myConnection.getConnection();
//创建通道
Channel channel = connection.createChannel();
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("fanout_kuaishou",true,defaultConsumer);
}
}
2号接受端:
只需要改动1号接受端的最后一行
改为:
代码语言:javascript复制channel.basicConsume("fanout_douyin",true,defaultConsumer);
5.按名字模式(Topics)-多个消息(TOPIC)
说明:和其它不同,它是发送端使用默认交换机,接受端使用TOPIC交换机
按名字:接受端设置自已的名字,发送端可以选择模糊名字包含xx发送,或者全匹配发送
实际业务:
根据靓号ID用户发消息。比如用户ID包含666的精品ID,去给这类用户发消息
发送端:
代码语言:javascript复制package com.zb.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//开启confirm的启动监听
channel.confirmSelect();
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
//-----------------------------------------------------------------------------------------
//TOPIC交换机和其它的有所不同,此处发送端使用了默认交换机,所以不用指定任何交换机
//公共意思是:看参2:向名字已info开头,后面包含 sms/email 的接受端发送信息
String commonMSG = "公共数据";
channel.basicPublish("topic_exchange", "info.sms.email", null, commonMSG.getBytes());
System.out.println("发送成功");
//私有意思是:看参2:向名字已info开头,后面包含 email 的接受端发送信息
String emailmsg = "邮箱私有数据";
channel.basicPublish("topic_exchange", "info.email", null, emailmsg.getBytes());
System.out.println("发送成功");
//私有意思是:看参2:向名字已info开头,后面包含 sms 的接受端发送信息
String smsmsg = "短信私有数据";
channel.basicPublish("topic_exchange", "info.sms", null, smsmsg.getBytes());
System.out.println("发送成功");
}
}
1号接受端:
代码语言:javascript复制package com.zb.topic;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class EmailConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//指定交换机,这里使用:模糊名字发送交换机(TOPIC)
// 参1:给交换机起的名字
// 参2:指定交换机模式
//-----------------------------------------------------------
//TOPIC交换机和其它的有所不同,发送端使用了默认交换机,此处的接受端使用TOPIC交换机
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("email_topic", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
//-----------------------------------------------------------
//TOPIC交换机和其它的有所不同,发送端使用了默认交换机,此处的接受端使用TOPIC交换机
channel.queueBind("email_topic", "topic_exchange", "info.#.email.#");
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("email_topic", true, defaultConsumer);
}
}
2号接受端:
改动1号接受端的-----第31行的第1个参数和37行的第1个参数和第3个参数
二.高级玩法
限流:一大巴数据过来了,可以一次处理x个(x个x个处理)
回应:消息发到RabbitMQ了,如果它接受到,就回应我
接受回应:接受端接受到了,回应我
1.限流(TOPIC)
说明:代码里面设置了,限流2个一次,意思是2个2个处理
发送端:
代码语言:javascript复制package com.zb.qos;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//for循环发送多个数据
for (int i = 0; i < 10; i ) {
//设置要发送的数据
String commonMSG = "限流数据" i;
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
channel.basicPublish("qos_exchange", "qos_routing", null, commonMSG.getBytes());
}
}
}
接受端:
代码语言:javascript复制package com.zb.qos;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class EmailConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
final Channel channel = connection.createChannel();
//指定交换机,这里使用:模糊名字发送交换机(TOPIC)
// 参1:给交换机起的名字
// 参2:指定交换机模式
channel.exchangeDeclare("qos_exchange", BuiltinExchangeType.TOPIC);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("qos_queue", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
channel.queueBind("qos_queue", "qos_exchange", "qos_routing");
//限流,一次处理2个(2个2个处理)
channel.basicQos(2);
//开始监听方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
if (str.equals("限流数据5")) {
channel.basicNack(envelope.getDeliveryTag(), false, false);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//设置监听数据
//参1:要监听那个队列的名字
//参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
//参3:使用那个监听方法
channel.basicConsume("qos_routing", false, defaultConsumer);
}
}
2.回应(DIRECT)
说明:RabbitMQ接收到回应和没接收到回应,有一些参数可以写一些回滚数据,或者重新发送数据的方法,或者写进日志,方便运维人员查看
发送端:
代码语言:javascript复制package com.zb.confirm;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Producer {
//该方法使用简单模式的默认交换机(DIRECT)
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接
MyConnection myConnection = new MyConnection();
//获取连接
Connection connection = myConnection.getConnection();
//创建通道
Channel channel = connection.createChannel();
//开启confirm的启动监听
channel.confirmSelect();
//指定交换机,这里使用默认交换机(DIRECT)
// 参1:给交换机起的名字
// 参2:指定交换机模式
channel.exchangeDeclare("confirm_exchange", BuiltinExchangeType.DIRECT);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("confirm_queue", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
channel.queueBind("confirm_queue", "confirm_exchange", "confirm_routing");
//设置要发送的数据
String strEmail = "confirm测试数据";
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
channel.basicPublish("confirm_exchange", "confirm_routing", null, strEmail.getBytes());
System.out.println("发送成功");
//confirm的回应和没有回应信息区,不多解释
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
System.out.println("mq回应了信息");
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("mq没有回应信息");
System.out.println("此处可能网络波动,导致没有成功,可以写一些回滚数据,或者重新发送数据的方法");
}
});
}
}
3.接受回应(DIRECT)
说明:接受端接受到了,回应我,一些参数可以写进日志,方便运维人员查看
发送端:
代码语言:javascript复制package com.zb.returns;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("开始发送数据...");
//创建连接,获取连接,创建通道
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
//开启confirm的启动监听
channel.confirmSelect();
//指定交换机,这里使用默认交换机(DIRECT)
// 参1:给交换机起的名字
// 参2:指定交换机模式
channel.exchangeDeclare("return_exchange", BuiltinExchangeType.DIRECT);
//声明队列
//参1:起个名字
//参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
//参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
//参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
//参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
channel.queueDeclare("return_queue", true, false, false, null);
//设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
channel.queueBind("return_queue", "return_exchange", "error_routing");
//设置要发送的数据
String strEmail = "return测试数据";
//启动交换机,发送数据
//参1:交换机名字
//参2:携带那个绑定好的
//参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
//参4:发送什么数据,必须byte数组类型
channel.basicPublish("return_exchange", "error_routing", true, null, strEmail.getBytes());
System.out.println("发送成功");
//接受发送完毕后的系统参数
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("形参里面有发送后的状态码,信息什么的");
System.out.println("此处可以根据形参,写一些日志,方便运维人员处理问题");
}
});
}
}
三.附加玩法
TTL:设置指定过期时间(10000=10秒),到达时间自动清除
DLX:队列上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上
TTL
发送端:
22行设置过期清除时间
代码语言:javascript复制package com.zb.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
String commonMSG = "时间数据";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().
builder().
deliveryMode(2).
contentEncoding("UTF-8").
expiration("10000").
build();
channel.basicPublish("ttl_exchange", "info.ttl", basicProperties, commonMSG.getBytes());
}
}
接受端:
代码语言:javascript复制package com.zb.ttl;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
public class EmailConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听");
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("ttl_exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("ttl_topic", true, false, false, null);
channel.queueBind("ttl_topic", "ttl_exchange", "info.ttl");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//监听数据
channel.basicConsume("ttl_topic", true, defaultConsumer);
}
}
DLX
发送端:
19行设置过期转发时间
成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)
代码语言:javascript复制package com.zb.dlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;
public class Producer {
public static void main(String[] args) throws Exception {
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
String commonMSG = "死信数据";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().
builder().
deliveryMode(2).
contentEncoding("UTF-8").
expiration("10000").
build();
channel.basicPublish("one_exchange", "one_routing", basicProperties, commonMSG.getBytes());
}
}
1号接受端:
代码语言:javascript复制package com.zb.dlx;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class OneConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听1");
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> param = new HashMap<String, Object>();
param.put("x-dead-letter-exchange", "two_exchange");
channel.exchangeDeclare("one_exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("one_queue", true, false, false, param);
channel.queueBind("one_queue", "one_exchange", "one_routing");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//监听数据
channel.basicConsume("one_queue", true, defaultConsumer);
}
}
2号接受端
代码语言:javascript复制package com.zb.dlx;
import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class TwoConsumer {
public static void main(String[] args) throws Exception {
System.out.println("启动监听2");
MyConnection myConnection = new MyConnection();
Connection connection = myConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("two_exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("two_queue", true, false, false, null);
channel.queueBind("two_queue", "two_exchange", "#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//监听数据
channel.basicConsume("two_queue", true, defaultConsumer);
}
}