本文基于最新rabbitmq:3.8.5版本,实现了direct、fanout、topic等几种主要消息模式,并基于spring-amqp完整实现了常见消息案例,同时也通过插件方式,实现了延迟消息的处理,帮您快速入门Rabbit消息处理。
内容概括
- rabbitmq相关环境及插件的安装
- springboot应用中work、pubish/subscribe、routing、topics、rpc、publisher confirm等模式示例
- 纯java应用中work、publisher confirm模式的示例
- 延迟消息队列示例
基础环境搭建
本文基于docker来安装RabbitMQ,通过pull当前最新版本rabbitmq:3.8.5-management
即可,之后通过如下的命令即可运行:
docker run -d --hostname rabbit-test --name rabbit-test -p 5672:5672 -p 15672:15672 rabbitmq:3.8.5-management
同时,如有需要,也可以通过-e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password
来指定相关的默认用户名称和密码。
另外,因为后续会需要实现一个延迟消息的示例(例如常见的网购7天后自动确认收货等),需要用到rabbitmq-delayed-message-exchange
插件,具体安装过程如下:
# 在官方页面下载插件后,拷贝到容器中
docker cp E:dev2trybackendshiboototherrabbitmqrabbitmq_delayed_message_exchange-3.8.0.ez rabbit-test:/plugins
# 进入容器类
docker exec -it rabbit-test /bin/sh
# enable插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 确认插件安装成功
rabbitmq-plugins list
SpringBoot-AMQP应用模式
work普通工作模式
该模式通过构建1个queue、2个receiver、1个sender来模拟:
- WorkSender 是每秒钟发送消息到队列中。
- WorkReceiver 接收并打印消息内容。
- WorkMqConfig 配置相关队列、接受者和发送者。
- 其他基于默认配置进行处理,默认的exchange是direct模式,两个receiver相互竞争模式获取消息。
配置类具体如下:
代码语言:txt复制@Configuration
public class WorkMqConfig {
@Bean
public Queue workQueue() {
return new Queue("rabbit-work");
}
@Bean
public WorkReceiver WorkReceiver1() {
return new WorkReceiver(1);
}
@Bean
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}
@Bean
public WorkSender sender() {
return new WorkSender();
}
}
消息发送类如下:
代码语言:txt复制@Slf4j
public class WorkSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger msgCount = new AtomicInteger(0);
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
// 每秒发送消息
StringBuilder msgBuilder = new StringBuilder("hi");
if (dots.getAndIncrement() == 4) {
// 达到4次则重置
dots.set(0);
}
for (int i = 0; i < dots.get(); i ) {
msgBuilder.append(".");
}
msgBuilder.append(msgCount.incrementAndGet());
rabbitTemplate.convertAndSend(queue.getName(), msgBuilder.toString());
log.info("send msg:{}", msgBuilder);
}
}
消息接受者(消费者)具体代码如下:
代码语言:txt复制@RabbitListener(queues = "rabbit-work")
@Slf4j
public class WorkReceiver {
private final int instance;
public WorkReceiver(int i) {
this.instance = i;
}
@RabbitHandler
public void receive(String msg) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
log.info("worker[{}] received msg:{}", instance, msg);
dealMsg(msg);
watch.stop();
log.info("worker[{}] done in {}s", instance, watch.getTotalTimeSeconds());
}
private void dealMsg(String msg) throws InterruptedException {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
// 消息中每个点休眠1秒
Thread.sleep(1000);
}
}
}
}
运行效果:(两个worker相关交替消费)
代码语言:txt复制worker[1] received msg:hi.1
worker[2] received msg:hi..2
worker[1] received msg:hi...3
worker[2] received msg:hi....4
publish/subscribe 发布/订阅(广播)模式
该模式基于FanoutExchange来实现:
- 构建一个FanoutExchange,实现对内容的路由
- 建立两个AnonymousQueue(匿名自动删除)队列,分别绑定到上述exchange,实现对消息的订阅
- 两个消费者从对应队列获取消息
- 此模式与 work模式的区别是,此处两个消费者都收到了相同的消息。
相关具体配置如下:
代码语言:txt复制@Configuration
public class PubSubConfig {
private static final String EXCHANGE_NAME = "sample.fanout";
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
public Queue autoDeleteQueue1() {
// 匿名自动删除队列
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchange());
}
@Bean
public PubSubSender sender() {
return new PubSubSender();
}
@Bean
public PubSubReceiver receiver() {
return new PubSubReceiver();
}
}
消息发送模式类似:
代码语言:txt复制@Slf4j
public class PubSubSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private FanoutExchange fanoutExchange;
AtomicInteger dots = new AtomicInteger();
AtomicInteger msgCount = new AtomicInteger();
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder msgBuilder = new StringBuilder("hello");
if (dots.getAndIncrement() == 4) {
dots.set(0);
}
for (int i = 0; i < dots.get(); i ) {
msgBuilder.append(".");
}
msgBuilder.append(msgCount.incrementAndGet());
// 把消息发送到 exchange,注意此处的写法与 work模式有差异,也即需要指定exchange的name
rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", msgBuilder.toString());
log.info("send msg:{}", msgBuilder);
}
}
消息消费者从队列获取消息:
代码语言:txt复制@Slf4j
public class PubSubReceiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receiver1(String msg) throws InterruptedException {
receiveMsg(msg, 1);
}
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receiver2(String msg) throws InterruptedException {
receiveMsg(msg, 2);
}
public void receiveMsg(String msg, int receiver) throws InterruptedException {
log.info("receiver[{}] receiver msg:{}", receiver, msg);
dealMsg(msg);
log.info("receiver[{}] done", receiver);
}
private void dealMsg(String msg) throws InterruptedException {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
// 消息中每个点休眠1秒
Thread.sleep(1000);
}
}
}
}
运行效果:(两个receiver相关均获取到消息)
代码语言:txt复制receiver[1] receiver msg:hello.1
receiver[2] receiver msg:hello.1
receiver[1] receiver msg:hello..2
receiver[2] receiver msg:hello..2
routing路由模式
路由模式则更灵活,可自动根据规则将消息投递到对应的队列中:
- 路由模式的exchange与work一样,都是direct模式。
- 通过在banding的时候,指定routingkey来实现消息路由,并且一个队列可绑定多个路由模式。
- 注意此时的routingkey不支持模糊匹配
下面以日志处理示例内容:
- 将error类消息路由到criticalQueue
- 将info、warn类消息路由到normalQueue
@Configuration
public class MqRouteConfig {
@Bean
public DirectExchange logExchange() {
return new DirectExchange("sample.direct.log");
}
@Bean
public Queue criticalQueue() {
return new AnonymousQueue();
}
@Bean
public Queue normalQueue() {
return new AnonymousQueue();
}
@Bean
public Binding bindingError() {
// 把错误日志绑定到criticalQueue
return BindingBuilder.bind(criticalQueue()).to(logExchange()).with("error");
}
@Bean
public Binding bindingInfo() {
// 把info日志绑定到normalQueue
return BindingBuilder.bind(normalQueue()).to(logExchange()).with("info");
}
@Bean
public Binding bindingWarn() {
// 把告警日志绑定到normalQueue
return BindingBuilder.bind(normalQueue()).to(logExchange()).with("warn");
}
@Bean
public LogSender logSender() {
return new LogSender();
}
@Bean
public LogReceiver logReceiver() {
return new LogReceiver();
}
}
消息发送者代码如下:
代码语言:txt复制@Slf4j
public class LogSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange logExchange;
AtomicInteger index = new AtomicInteger(0);
AtomicInteger msgCount = new AtomicInteger(0);
private final String[] types = {"error", "warn", "info"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder msgBuilder = new StringBuilder("log of ");
if (this.index.incrementAndGet() == types.length) {
this.index.set(0);
}
String type = types[this.index.get()];
msgBuilder.append(type).append(" ").append(this.msgCount.incrementAndGet());
for (int i = 0; i < index.get(); i ) {
msgBuilder.append(".");
}
rabbitTemplate.convertAndSend(logExchange.getName(), type, msgBuilder.toString());
log.info("send msg:{}", msgBuilder.toString());
}
}
消息接受者功能如下:
- 分别注册三个RabbitListener,其中criticalLogger对应criticalQueue,normalLogger、otherLogger对应normalQueue
- criticalLogger消费error消息
- normalLogger、otherLogger则类似于work模式,均从normalQueue竞争模式获取消息。
@Slf4j
public class LogReceiver {
@RabbitListener(queues = "#{criticalQueue.name}")
public void criticalLogger(String msg) throws InterruptedException {
receive(msg, "critical");
}
@RabbitListener(queues = "#{normalQueue.name}")
public void normalLogger(String msg) throws InterruptedException {
receive(msg, "normal");
}
@RabbitListener(queues = "#{normalQueue.name}")
public void otherLogger(String msg) throws InterruptedException {
receive(msg, "other");
}
public void receive(String msg, String type) throws InterruptedException {
log.info("logger[{}] get msg:{}", type, msg);
this.dealMsg(msg);
log.info("logger[{}] done.", type);
}
private void dealMsg(String msg) throws InterruptedException {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
// 消息中每个点休眠1秒
Thread.sleep(1000);
}
}
}
}
运行效果:(critical队列的消息都是error,normal和other则分别交替获取到warn和info消息)
代码语言:txt复制logger[normal] get msg:log of warn 1.
logger[other] get msg:log of info 2..
logger[critical] get msg:log of error 3
logger[normal] get msg:log of warn 4.
logger[other] get msg:log of info 5..
logger[critical] get msg:log of error 6
topic主题匹配模式
topic模式是最灵活的模式:
- 其exchange是三种类型中的最后一类:TopicExchange
- 与direct和fanout的区别是,topic支持按
*
或#
来模糊匹配,其中*号代表一个单词,#代表0或多个单词(word)
下面以通常的短信和email通知示例:
*.reg
路由消息代表用户注册,同时发送到emailQueue和smsQueue#.password
路由消息代表用户密码变动,仅发送到emailQueue#.captcha
路由消息代表验证码内容,仅发送到smsQueue
具体配置信息如下:
代码语言:txt复制@Configuration
public class TopicMsgConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("sample.topic.user");
}
@Bean
public TopicMsgReceiver receiver() {
return new TopicMsgReceiver();
}
@Bean
public Queue emailQueue() {
return new AnonymousQueue();
}
@Bean
public Queue smsQueue() {
return new AnonymousQueue();
}
@Bean
public Binding bindingEmailReg() {
// 注册消息都发邮件
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("*.reg");
}
@Bean
public Binding bindingEmailPassword() {
// 密码信息都发邮件
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("#.password");
}
@Bean
public Binding bindingSmsReg() {
// 用户注册都发短信
return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("*.reg");
}
@Bean
public Binding bindingSmsCaptcha() {
// 验证码均发短信
return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("#.captcha");
}
@Bean
public TopicMsgSender msgSender() {
return new TopicMsgSender();
}
}
消息发送示例如下:
代码语言:txt复制@Slf4j
public class TopicMsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange topicExchange;
AtomicInteger index = new AtomicInteger();
AtomicInteger msgCount = new AtomicInteger();
private final String[] msgKeys = {"user.reg", "user.update.password", "user.reg.captcha"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendMsg() {
StringBuilder msgBuilder = new StringBuilder("用户信息变更 ");
if (this.index.incrementAndGet() == msgKeys.length) {
this.index.set(0);
}
String key = msgKeys[this.index.get()];
msgBuilder.append(key).append(msgCount.incrementAndGet());
rabbitTemplate.convertAndSend(topicExchange.getName(), key, msgBuilder.toString());
log.info("发出消息:{}", msgBuilder);
}
}
消息接收代码如下:
- 注册两个RabbitListener,一个对接emailQueue,另一个对接smsQueue
@Slf4j
public class TopicMsgReceiver {
@RabbitListener(queues = "#{emailQueue.name}")
public void emailReceiver(String msg) throws InterruptedException {
receive(msg, "email");
}
@RabbitListener(queues = "#{smsQueue.name}")
public void SmsReceiver(String msg) throws InterruptedException {
receive(msg, "sms");
}
public void receive(String msg, String inst) throws InterruptedException {
log.info("instance[{}] received msg:{}", inst, msg);
dealMsg(msg);
log.info("instance[{}] done", inst);
}
private void dealMsg(String msg) throws InterruptedException {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
// 消息中每个点休眠1秒
Thread.sleep(1000);
}
}
}
}
运行效果:
- email和sms两个实例都获取到了user.reg消息
- email单独获取到了user.update.password消息
- sms单独获取到了user.reg.captcha消息
instance[email] received msg:用户信息变更 user.update.password1
instance[email] received msg:用户信息变更 user.reg3
instance[email] received msg:用户信息变更 user.update.password4
instance[email] received msg:用户信息变更 user.reg6
instance[sms] received msg:用户信息变更 user.reg.captcha2
instance[sms] received msg:用户信息变更 user.reg3
instance[sms] received msg:用户信息变更 user.reg.captcha5
instance[sms] received msg:用户信息变更 user.reg6
rpc远程过程调用模式
rpc涉及到客户端和服务端交互:
- exchange依然是基于DirectExchange模式
- 此处的RPC模式与routing模式的主要区别是,RPC模式下,发送消息是基于convertSendAndReceive方法,而其他模式一般是基于convertAndSend方法
- 另外,除了常规的基于RabbitTemplate来实现同步模式外,也可以通过AsyncRabbitTemplate来实现异步RPC,也即不需要等待上一条消息的返回,通过回调来接收消息响应信息。
配置类具体代码如下:
代码语言:txt复制@Configuration
public class RpcMsgConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("sample.rpc");
}
@Bean
public RpcMsgClient client() {
return new RpcMsgClient();
}
@Bean
public Queue queue() {
return new Queue("sample.rpc.requests");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("rpc");
}
@Bean
public RpcMsgServer server() {
return new RpcMsgServer();
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
return new AsyncRabbitTemplate(rabbitTemplate);
}
}
服务端实现一个fibnacci数列求和,输入是n,返回的是求和结果
代码语言:txt复制@Slf4j
public class RpcMsgServer {
@RabbitListener(queues = "sample.rpc.requests")
public int fibnacci(int n) {
log.info("server received fib of :{}", n);
int result = fib(n);
log.info("server returned result:{}", result);
return result;
}
public int fib(int n) {
return n == 0 ? 0 : (n == 1 ? 1 : (fib(n - 1) fib(n - 2)));
}
}
客户端同步模式代码如下:
代码语言:txt复制@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendMsg() {
// 此处为默认同步队列方式发送消息,也即上一消息未收到回复之前,不会发送下一条消息。
// 默认的 超时时间是5秒,可通过setReplyTimeout来修改
rabbitTemplate.setReplyTimeout(6000L);
int fib = start;
log.info("sync client send requesting fib({})", fib);
Integer response = (Integer) rabbitTemplate.convertSendAndReceive(exchange.getName(), "rpc",
fib);
// 超时之后会得到null的返回值
log.info("sync client got fib({}) response:{}", fib, response);
start ;
}
同步模式运行效果:
- 在获取到上一个结果之后,才会发出小一条消息
- 在计算到46时,客户端就因为超时而无法获取到结果,并且抛出了相关异常
sync client send requesting fib(45)
sync client got fib(45) response:1134903170
sync client send requesting fib(46)
sync client got fib(46) response:null
sync client send requesting fib(47)
sync client got fib(47) response:null
客户端异常回调代码如下:
代码语言:txt复制@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void asyncSendMsg() {
int fib = start;
log.info("async client send fib({})", fib);
// 异步发送请求
AsyncRabbitTemplate.RabbitConverterFuture<Integer> future = asyncRabbitTemplate.
convertSendAndReceive(exchange.getName(), "rpc", fib);
// 增加回调
future.addCallback(new ListenableFutureCallback<Integer>() {
@Override
public void onFailure(Throwable throwable) {
log.warn("async client failed", throwable);
}
@Override
public void onSuccess(Integer integer) {
log.info("async client got fib({}) reponse:{}", fib, integer);
}
});
start ;
}
异常模式运行效果如下:
- 消息的发送没有被阻塞
- 异步获取到了相关返回结果
async client send fib(44)
async client got fib(43) reponse:433494437
async client send fib(45)
async client got fib(43) reponse:433494437
async client send fib(46)
async client send fib(47)
async client send fib(48)
async client got fib(45) reponse:1134903170
async client send fib(49)
async client send fib(50)
publisher confirm发布者确认模式
官方示例中并没有给出publisher confirm的实现模式,以下示例供参考:
- 通过在application配置文件中,设定
publisher-confirm-type
和publisher-returns
来设定发布者的确认回调和 返回回调模式。 - 通常是,如果消息正常发布到了exchange则算是自动确认(ack),如果因为routingkey错误等导致无法被正常路由,如果publisher-returns没有设置为true,则一般会被自动删除,否则会触发return回调。
- 可通过
listener.simple.acknowledge-mode
来设置消费者的确认模式,默认是自动,可设置为manual来手工确认。【注意如果设置为manual,如果客户端因为异常等原因没有触发basicAck或basicNack等操作,该消息在消息队列中处于Ready状态,但对于消息发送方来说,依然是属于已确认状态,因为消息发送方的确认是指消息被成功投递到exchange broker】 - 可通过
listener.simple.prefetch
来进行消费端限流,尤其是在消费端涉及到数据库操作等情况下。
示例application.yml配置如下:
代码语言:txt复制spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: simple
publisher-returns: true
listener:
simple:
# 手工确认,通常不需要
acknowledge-mode: manual
# 限流
prefetch: 30
配置类代码:
代码语言:txt复制@Configuration
public class MsgConfig {
@Bean
public TopicExchange exchange() {
return new TopicExchange("sample.confirm.exchange");
}
@Bean
public Queue queue() {
return new Queue("sample.confirm.queue");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("#.confirm");
}
@Bean
public Snowflake snowflake() {
return IdUtil.createSnowflake(1, 1);
}
}
消息发送者代码如下:
代码语言:txt复制@Component
@Slf4j
public class MsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange exchange;
@Autowired
private Snowflake snowflake;
private static AtomicInteger msgCount = new AtomicInteger();
private static String[] keys = {"test.confirm", "good.confirm", "error.conf"};
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void sendMsg() {
rabbitTemplate.setConfirmCallback((
(correlationData, ack, cause) -> {
// 监听 broker的应答
log.info("sender confirm callback:{},{},{}", correlationData, ack, cause);
if (!ack) {
log.warn("sender 消息未确认");
}
}));
// 设置强制标识,必须设置了setReturnCallback,true是指broker不自动删除不可达消息,并通过ReturnCallback回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((
(message, replyCode, replyText, exchange, routingKey) -> {
// 监听 不可路由的消息
log.info("sender return callback:{},{},{},{},{}", message, replyCode, replyText,
exchange, routingKey);
}));
CorrelationData data = new CorrelationData(snowflake.nextIdStr());
// 有意根据数组发送无法路由消息
String key = keys[msgCount.get() % 3];
DemoUser user = DemoUser.builder()
.userId(msgCount.incrementAndGet())
.userName("用户" msgCount.get())
.build();
String msg = JSON.toJSONString(user);
MessageProperties messageProperties = new MessageProperties();
// 设置内容为json模式
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置3秒过期,过期的被自动删除
messageProperties.setExpiration("3000");
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(exchange.getName(), key, message, data);
}
}
消息消费者代码如下:
- 注意如果basicNack中指定requeue为true,该消息会再次回到消息队列头部,从而很容易造成消息消费的死循环。
@Component
@Slf4j
public class MsgConsumer {
@RabbitListener(queues = "#{queue.name}")
@RabbitHandler
public void dealMsg(Message message, Channel channel) throws IOException, InterruptedException {
String msg = new String(message.getBody());
log.info("consumer got msg:{}", msg);
DemoUser user = JSON.parseObject(msg, DemoUser.class);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (user.getUserId() % 10 == 0) {
// 有意造成失败,也会返回失败确认,也即 ack,nack,reject都是确认
// requeue 如果设置为true,很可能会造成消息消费的死循环
channel.basicNack(deliveryTag, true, false);
log.info("consumer msg nack:{},{}", deliveryTag, user);
} else {
channel.basicAck(deliveryTag, true);
}
// 有意减慢处理速度
Thread.sleep(2000);
}
}
运行效果:
- sender每个成功投递的消息,都收到了confirm callback
- sender路由错误的消息,都收到了return callback
sender confirm callback:CorrelationData [id=1284487848413761536],true,null
sender confirm callback:CorrelationData [id=1284487852880695296],true,null
sender return callback:(Body:'{"userId":3,"userName":"用户3"}' MessageProperties [headers={spring_returned_message_correlation=1284487857083387904}, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=3000, priority=0, deliveryTag=0]),312,NO_ROUTE,sample.confirm.exchange,error.conf
consumer got msg:{"userId":1,"userName":"用户1"}
consumer got msg:{"userId":2,"userName":"用户2"}
consumer got msg:{"userId":4,"userName":"用户4"}
consumer got msg:{"userId":5,"userName":"用户5"}
consumer got msg:{"userId":7,"userName":"用户7"}
consumer got msg:{"userId":8,"userName":"用户8"}
consumer got msg:{"userId":10,"userName":"用户10"}
consumer msg nack:7,DemoUser(userId=10, userName=用户10)
Java应用模式
简单work模式
springaqmp自动实现了相关依赖配置,纯java应用则需要自己来进行调用:
- 通过 ConnectionFactory 来配置连接信息,创建连接connection,并通过connection来创建channel
- 通过channel.queueDeclare来声明队列信息
- 通过channel.basicPublish等方法来发送消息
消息发送端代码示例:
代码语言:txt复制@Slf4j
public class PureSender {
private final static String QUEUE_NAME = "sample.java";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello,你好";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
log.info("client send:{}", msg);
}
}
}
消息消费端示例:
- 在消费端,注意不要用try语法来自动关闭连接,否则就只能运行一次。
@Slf4j
public class PureReceiver {
private final static String QUEUE_NAME = "sample.java";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
log.info("server waiting for msg");
channel.basicConsume(QUEUE_NAME, true, ((s, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
log.info("server received msg:{}", msg);
}), s -> {
});
}
}
运行效果:
- 主要要先运行PureReceiver,等待消息发来,然后再运行PureSender,向队列发送消息。
- 客户端成功发出消息,服务端也成功接收到了消息
client send:hello,你好
server waiting for msg
server received msg:hello,你好
publisher confirm发布者确认
rabbitmq官网提供了java版本的模式实现:
- 调用channel的confirmSelect,将模式设定为开启发布者确认。
- pubMsgIndividually是最简单模式,直接发送消息后通过waitForConfirmsOrDie来等待确认,也是性能最差的一种方式
- pubMsgInBatch是批量模式,逐条发送消息后,通过计数器来进行批次确认等待确认
- handleMsgAsync是异步模式,也是性能最高的一种模式,通过channel的addConfirmListener来进行确认结果的监听
消息发送方:
1、创建连接:
代码语言:txt复制static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
return factory.newConnection();
}
2、逐条发送模式:
代码语言:txt复制static void pubMsgIndividually() throws IOException, TimeoutException, InterruptedException {
// 单独发送消息模式
try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect();
long start = System.nanoTime();
for (int i = 0; i < MSG_COUNT; i ) {
String msg = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 同步等待消息确认结果,5秒后超时
channel.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
// 打印发出的消息数目,以及用时
log.info("sended {} msgs individually in {} ms", MSG_COUNT,
Duration.ofNanos(end - start).toMillis());
}
}
3、批量模式:
代码语言:txt复制static void pubMsgInBatch() throws IOException, TimeoutException, InterruptedException {
// 批量发送消息
try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect();
int batchSize = 100;
int outstandingMsgCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MSG_COUNT; i ) {
String msg = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
outstandingMsgCount ;
if (outstandingMsgCount == batchSize) {
// 按100条的批次等待消息确认
channel.waitForConfirmsOrDie(5_000);
outstandingMsgCount = 0;
}
}
if (outstandingMsgCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
log.info("sended {} msgs in bath in {} ms", MSG_COUNT,
Duration.ofNanos(end - start).toMillis());
}
}
4、异步模式:
- 构建ConcurrentSkipListMap来存储待处理消息
- 通过ackCallback和nackCallback,监听消息确认情况,确认的消息将被清除
static void handleMsgAsync() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect();
// ConcurrentSkipListMap是线程安全的高并发有序哈希map
// key为消息序号,string为消息内容
// 用该map来记录待处理的消息
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
// 消息被确认的回调
if (multiple) {
// 批量确认,小于等于deliveryTag的都会被确认
// headMap返回的是小于等于给定值的map子集
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
deliveryTag, true);
// 清空所有确认的消息
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (
(deliveryTag, multiple) -> {
// 消息丢失的回调
String msg = outstandingConfirms.get(deliveryTag);
log.error("msg with body {} nack-ed,deliveryTag:{},multiple:{}", msg,
deliveryTag, multiple);
// 处理丢失的消息
cleanOutstandingConfirms.handle(deliveryTag, multiple);
}));
long start = System.nanoTime();
for (int i = 0; i < MSG_COUNT; i ) {
String msg = String.valueOf(i);
outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("60秒内无法确认所有消息");
}
long end = System.nanoTime();
log.info("sended {} msgs in handleMsgAsync in {} ms", MSG_COUNT,
Duration.ofNanos(end - start).toMillis());
}
}
消息接收端代码如下:
代码语言:txt复制@Slf4j
public class MsgReceiver {
private final static String QUEUE_NAME = "sample.pubConf";
static AtomicInteger msgCount = new AtomicInteger(0);
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = PublisherConfirms.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
log.info("server waiting for msg");
channel.basicConsume(QUEUE_NAME, true, (
(c, delivery) -> {
String msg = new String(delivery.getBody(), "UTF-8");
// 对接收到的消息进行累加
if (msgCount.incrementAndGet() % 20000 == 0) {
// 每2万条消息打印一次
log.info("server got {} messages", msgCount);
log.info("current msg:{}", msg);
}
}), s -> {
log.warn("consumer canceled:{}", s);
});
}
}
运行效果:
- 客户端分三批次发出了2万条消息,第一次用时14秒左右,第二次用时1.6秒,第三次用时1.1秒
- 服务端总共收到了6万条消息
sended 20000 msgs individually in 14609 ms
sended 20000 msgs in bath in 1655 ms
sended 20000 msgs in handleMsgAsync in 1176 ms
server got 20000 messages
current msg:19999
server got 40000 messages
current msg:19999
server got 60000 messages
current msg:19999
延迟消息模式
开始延迟消息模式之前,需要先安装rabbitmq_delayed_message_exchange插件,然后:
- 通过
exchange.setDelayed(true);
来将broker设置为延迟模式 - 发送消息时,通过
properties.setDelay(3_000);
来设定每条消息的延迟时间,单位毫秒
消息配置类:
代码语言:txt复制@Configuration
public class DelayMQConfig {
@Bean
public TopicExchange exchange() {
TopicExchange exchange = new TopicExchange("sample.delay");
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue queue() {
return new Queue("queue.delay");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("#.delay");
}
}
消息发送类:
代码语言:txt复制@Component
@Slf4j
public class MsgSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange exchange;
private static AtomicInteger msgCount = new AtomicInteger();
@Scheduled(fixedDelay = 2000, initialDelay = 500)
public void sendMsg() {
// 循环发送3条消息
String msg = String.format("你好,msg%s", msgCount.incrementAndGet());
rabbitTemplate.convertAndSend(exchange.getName(), "msg.delay", msg, (message -> {
MessageProperties properties = message.getMessageProperties();
// 设置延迟3秒
properties.setDelay(3_000);
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}));
log.info("send msg:{}", msg);
}
}
消息消费类:
代码语言:txt复制@Component
@Slf4j
public class MsgConsumer {
@RabbitListener(queues = "#{queue.name}")
@RabbitHandler
public void dealMsg(Message message) {
log.info("consumer got msg:{}", new String(message.getBody()));
}
}
运行效果:
- 在消息发出3秒钟以后,消费方才接收到消息
2020-07-18 22:37:44.851: send msg:你好,msg1
2020-07-18 22:37:46.851: send msg:你好,msg2
2020-07-18 22:37:47.869: consumer got msg:你好,msg1
2020-07-18 22:37:48.854: send msg:你好,msg3
2020-07-18 22:37:49.856: consumer got msg:你好,msg2
2020-07-18 22:37:50.855: send msg:你好,msg4
2020-07-18 22:37:51.861: consumer got msg:你好,msg3
2020-07-18 22:37:52.856: send msg:你好,msg5
源码信息
本案例源码地址:https://gitee.com/coolpine/backends/tree/master/hiboot/src/main/java/pers/techlmm/rabbit
更多参考资料:
- springamqp官方资料 https://spring.io/projects/spring-amqp
- rabbitmq官方资料 https://www.rabbitmq.com/getstarted.html
- rabbitmq社区插件 https://www.rabbitmq.com/community-plugins.html
- 延迟消息插件详情 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0rabbitmq
- 延迟消息github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
- springamqp api文档 https://docs.spring.io/spring-amqp/api/