RabbitMQ六种工作模式

2020-09-30 10:20:42 浏览数 (1)

1.hello world模式,单生产者单消费者

2.Work queue 生产者传入多个消费者进行处理,每条消息只能被一个消费者拿到。

3.发布订阅模式

将消息以某种规则发给消费者。该模式下多了一个交换器,该交换器会把消息复制多个副本传入多个队列中,c1 c2会获的相同的信息.

4.Routing 路由器模式

相当于有选择的发布订阅模式,会根据消费者的要求将满足条件的消息发送给对应的消费者。但是该模式的匹配是精准匹配,不支持模糊匹配。

5.Topic 主题模式

与路由器模式相比,支持模糊匹配。

6.RPC 远程调用

由于第一种模式只是作为demo,第六种RPC模式一般不用,后续重点介绍其他四种模式。

工作队列模式

多个消息的情况下,工作队列会将消息发送给不同的消费者。并且可以根据自身处理消息的速度来控制接受消息的数量。

其适合在集群环境中做异步处理的认为,能最大地发挥每一台服务器的性能。

多消费者时默认采用轮询的方式为每个消费者分配任务,当各个服务器处理能力不均匀时,容易造成消息堆积,性能低下。

可以使用如下方式:

代码语言:javascript复制
channel.basicQos(1);

只有该消费者处理完一个消息收到确认后再对其发送。处理完一个取一个。

例如实现12306短信通知服务,当购票成功后,需要给用户发送一条购票成功的短信。由于我们发现对于服务器而言发送短信属于支线任务,其的结果对主线影响不大,因此可以采用工作队列模式,执行主业务的服务器把发短信任务发送给MQ,其他服务器从MQ中取出任务发送短信。

实现代码如下:

代码语言:javascript复制
public class OrderSystem {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        for(int i = 100; i < 200; i  ){
            SMS sms = new SMS("乘客"   i, "15891001"   i, "购票成功");
            String smsJson = new Gson().toJson(sms);
            channel.basicPublish("", RabbitConstant.QUEUE_SMS,null, smsJson.getBytes());
        }
        System.out.println("发送成功");
        channel.close();
        connection.close();
    }
}
/**
 * 从消息队列中取出任务 发短信
 */
public class SMSSender {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        // 保证Qos
        channel.basicQos(1);
        channel.queueDeclare(RabbitConstant.QUEUE_SMS,false, false, false, null);
        channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String sms = new String(body);
                System.out.println("短信发送成功"   sms);
                // 消息的签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
发布/订阅模式

该模式下生产者不再与队列绑定,而是将数据发送至“交换机”中,交换机无差别的将所有消息送入与之绑定的队列,进而供消费者使用。因此各个消费者拿到的消息完全相同,交换机的类型为“fanout”。

该模式特别适合类似“天气预报”发布的场景。首先由气象局将天气预报送入交换机,然后交换机根据队列绑定情况将天气预报发送到“百度”,“新浪”等门户网站的队列中。

代码语言:javascript复制
public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"",null,"天气测试".getBytes());
        channel.close();
        connection.close();
    }
}
代码语言:javascript复制
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息"   new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
Routing路由模式

发布订阅模式中交换机是无条件将消息发送到与其绑定的队列中。二路由模式中交换机根据Routing Key将消息筛选后发送给消费者队列。路由模式下交换机的模式为direct。

假设百度只关心北京的天气,新浪只关心西安的天气,代码如下:

代码语言:javascript复制
public class WeatherBureau {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Map<String, String> map = new HashMap<>();
        map.put("西安_20200927","晴");
        map.put("西安_20200928","阴");
        map.put("北京_20200927","小雨");
        map.put("北京_20200928","大雨");
        Channel channel = connection.createChannel();
        for(Map.Entry<String, String> entry : map.entrySet()){
            // 以entry的key作为路由key 起一个数据筛选的作用
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, entry.getKey(),
                    null, entry.toString().getBytes());
        }
        channel.close();
        connection.close();
    }
}
代码语言:javascript复制
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key 以 "北京_20200927"作为key进行筛选消息
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200927");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "北京_20200928");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息"   new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
代码语言:javascript复制
public class Sina {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200927");
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_ROUTING, "西安_20200928");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Sina收到信息"   new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
Topic主题模式

在路由模式的基础上提供了对路由key的模糊匹配功能。

规则如下:

代码语言:javascript复制
 *  匹配单个关键字
 #  匹配所有关键字
 关键字书写用.隔开

主题模式下交换机的类型为topic。

对于如下场景,新浪只接受西安的天气,百度只接受9月27日的消息。

代码语言:javascript复制
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getConnection();
        Map<String, String> map = new HashMap<>();
        map.put("中国.西安.20200927","晴");
        map.put("中国.西安.20200928","阴");
        map.put("中国.北京.20200927","小雨");
        map.put("中国.北京.20200928","大雨");
        Channel channel = connection.createChannel();
        for(Map.Entry<String, String> entry : map.entrySet()){
            // 以entry的key作为路由key 起一个数据筛选的作用
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, entry.getKey(),
                    null, entry.toString().getBytes());
        }
        channel.close();
        connection.close();
    }
代码语言:javascript复制
// sina只接受西安的天气
public class Sina {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_SINA,
                RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.西安.*");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Sina收到信息"   new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
代码语言:javascript复制
// 假设百度只接受9月27号的数据
public class Baidu {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false, false, false, null);
        // 将队列绑定到交换机上
        // 队列名 交换机名 路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU,
                RabbitConstant.EXCHANGE_WEATHER_TOPIC, "#.20200927");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到信息"   new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
rpc

0 人点赞