Fanout Exchange
简介
- 不处理路由键, 只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的
代码实现
消费者1
代码语言:javascript复制package com.dance.redis.mq.rabbit.fanout;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver4FanoutExchange1 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; // 不设置路由键
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_FANOUT);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.err.println("--------------- consumer 1 --------------");
System.out.println("recvive message:" new String(body) ", RoutingKey: " envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
消费者2
代码语言:javascript复制package com.dance.redis.mq.rabbit.fanout;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver4FanoutExchange2 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
String queueName = "test_fanout_queue";
String routingKey = ""; // 不设置路由键
RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_FANOUT);
RabbitMQHelper.queueDeclare(channel,queueName);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.err.println("--------------- consumer 2 --------------");
System.out.println("recvive message:" new String(body) ", RoutingKey: " envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
代码语言:javascript复制package com.dance.redis.mq.rabbit.fanout;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4FanoutExchange {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String exchangeName = "test_fanout_exchange";
for(int i = 0; i < 10; i ) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "" , null , msg.getBytes());
}
channel.close();
RabbitMQHelper.closeConnection();
}
}
测试
启动消费者1
启动消费者2
启动生产者
查看消费者1
查看消费者2
每人五条, 消费均摊