1.hello world模式,单生产者单消费者
2.Work queue 生产者传入多个消费者进行处理,每条消息只能被一个消费者拿到。
3.发布订阅模式
将消息以某种规则发给消费者。该模式下多了一个交换器,该交换器会把消息复制多个副本传入多个队列中,c1 c2会获的相同的信息.
4.Routing 路由器模式
相当于有选择的发布订阅模式,会根据消费者的要求将满足条件的消息发送给对应的消费者。但是该模式的匹配是精准匹配,不支持模糊匹配。
5.Topic 主题模式
与路由器模式相比,支持模糊匹配。
6.RPC 远程调用
由于第一种模式只是作为demo,第六种RPC模式一般不用,后续重点介绍其他四种模式。
工作队列模式
多个消息的情况下,工作队列会将消息发送给不同的消费者。并且可以根据自身处理消息的速度来控制接受消息的数量。
其适合在集群环境中做异步处理的认为,能最大地发挥每一台服务器的性能。
多消费者时默认采用轮询的方式为每个消费者分配任务,当各个服务器处理能力不均匀时,容易造成消息堆积,性能低下。
可以使用如下方式:
代码语言:javascript复制channel.basicQos(1);
只有该消费者处理完一个消息收到确认后再对其发送。处理完一个取一个。
例如实现12306短信通知服务,当购票成功后,需要给用户发送一条购票成功的短信。由于我们发现对于服务器而言发送短信属于支线任务,其的结果对主线影响不大,因此可以采用工作队列模式,执行主业务的服务器把发短信任务发送给MQ,其他服务器从MQ中取出任务发送短信。
实现代码如下:
代码语言:javascript复制public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 100; i < 200; i ){
SMS sms = new SMS("乘客" i, "15891001" i, "购票成功");
String smsJson = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS,null, smsJson.getBytes());
}
System.out.println("发送成功");
channel.close();
connection.close();
}
}
/**
* 从消息队列中取出任务 发短信
*/
public class SMSSender {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 保证Qos
channel.basicQos(1);
channel.queueDeclare(RabbitConstant.QUEUE_SMS,false, false, false, null);
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String sms = new String(body);
System.out.println("短信发送成功" sms);
// 消息的签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
发布/订阅模式
该模式下生产者不再与队列绑定,而是将数据发送至“交换机”中,交换机无差别的将所有消息送入与之绑定的队列,进而供消费者使用。因此各个消费者拿到的消息完全相同,交换机的类型为“fanout”。
该模式特别适合类似“天气预报”发布的场景。首先由气象局将天气预报送入交换机,然后交换机根据队列绑定情况将天气预报发送到“百度”,“新浪”等门户网站的队列中。
代码语言:javascript复制public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null,"天气测试".getBytes());
channel.close();
connection.close();
}
}
代码语言:javascript复制public class Baidu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
// 将队列绑定到交换机上
// 队列名 交换机名 路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到信息" new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
Routing路由模式
发布订阅模式中交换机是无条件将消息发送到与其绑定的队列中。二路由模式中交换机根据Routing Key将消息筛选后发送给消费者队列。路由模式下交换机的模式为direct。
假设百度只关心北京的天气,新浪只关心西安的天气,代码如下:
代码语言:javascript复制public class WeatherBureau {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Map<String, String> map = new HashMap<>();
map.put("西安_20200927","晴");
map.put("西安_20200928","阴");
map.put("北京_20200927","小雨");
map.put("北京_20200928","大雨");
Channel channel = connection.createChannel();
for(Map.Entry<String, String> entry : map.entrySet()){
// 以entry的key作为路由key 起一个数据筛选的作用
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, entry.getKey(),
null, entry.toString().getBytes());
}
channel.close();
connection.close();
}
}
代码语言:javascript复制public class Baidu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
// 将队列绑定到交换机上
// 队列名 交换机名 路由key 以 "北京_20200927"作为key进行筛选消息
channel.queueBind(RabbitConstant.QUEUE_BAIDU,
RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200927");
channel.queueBind(RabbitConstant.QUEUE_BAIDU,
RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200928");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到信息" new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
代码语言:javascript复制public class Sina {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
// 将队列绑定到交换机上
// 队列名 交换机名 路由key
channel.queueBind(RabbitConstant.QUEUE_SINA,
RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200927");
channel.queueBind(RabbitConstant.QUEUE_SINA,
RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200928");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Sina收到信息" new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
Topic主题模式
在路由模式的基础上提供了对路由key的模糊匹配功能。
规则如下:
代码语言:javascript复制 * 匹配单个关键字
# 匹配所有关键字
关键字书写用.隔开
主题模式下交换机的类型为topic。
对于如下场景,新浪只接受西安的天气,百度只接受9月27日的消息。
代码语言:javascript复制 public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Map<String, String> map = new HashMap<>();
map.put("中国.西安.20200927","晴");
map.put("中国.西安.20200928","阴");
map.put("中国.北京.20200927","小雨");
map.put("中国.北京.20200928","大雨");
Channel channel = connection.createChannel();
for(Map.Entry<String, String> entry : map.entrySet()){
// 以entry的key作为路由key 起一个数据筛选的作用
channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, entry.getKey(),
null, entry.toString().getBytes());
}
channel.close();
connection.close();
}
代码语言:javascript复制// sina只接受西安的天气
public class Sina {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
// 将队列绑定到交换机上
// 队列名 交换机名 路由key
channel.queueBind(RabbitConstant.QUEUE_SINA,
RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.西安.*");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Sina收到信息" new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
代码语言:javascript复制// 假设百度只接受9月27号的数据
public class Baidu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
// 将队列绑定到交换机上
// 队列名 交换机名 路由key
channel.queueBind(RabbitConstant.QUEUE_BAIDU,
RabbitConstant.EXCHANGE_WEATHER_TOPIC, "#.20200927");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("百度收到信息" new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}