rabbitmq主题订阅

2020-09-15 10:11:40 浏览数 (1)

一、topic 模式交换机

上一篇文章讲述了关于直接连接交换机根据key找到对应队列的方式,实现特殊消息特殊队列消费的目的,但是事实上,生产环境下,对于消息的复杂性远不是这样就能够解决的!比如:你要监控有个用户的操作行为,用户的操作行为太多了 增删改查,如果一个一个的写难免会有遗漏,这个时候,我们可以用通配符 user.* 轻松解决!这就是mq的主题模式

这里的交换机类型为 topic 模式的,他更像direct模式,只不过direct是单个匹配,而topic是通配符匹配

  • *:代表一个字符
  • #:代表多个字符

他的用法极其类似于direct 模式,我们不多说了,直接看代码

二、主要代码

消息生产者:消息生产者,在发送消息的时候需要指定消息类型

代码语言:javascript复制
String msg = "醉卧沙场君莫笑";
//关注第二个参数
channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());

消息消费者消息消费者,在绑定交换机的时候需要指定通配符

代码语言:javascript复制
//绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

三、详细代码

消息生产者

代码语言:javascript复制
package com.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发布订阅模式
 * 主题模式
 * @author huangfu
 */
public class TopicsSend {
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = MqConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String msg = "醉卧沙场君莫笑";
        channel.basicPublish(EXCHANGE_NAME,"huangfu.del",null,msg.getBytes());
        System.out.println("send:" msg);
        channel.close();
        connection.close();
    }
}

消费者1

代码语言:javascript复制
package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv {
    private static String QUEUE_NAME = "topics";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.add");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2

代码语言:javascript复制
package com.topics;

import com.rabbitmq.client.*;
import com.util.MqConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 */
public class TopicsRecv2 {
    private static String QUEUE_NAME = "topics2";
    private static String EXCHANGE_NAME = "topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MqConnection.getConnection();
        final Channel channel = connection.createChannel();

        //声明对垒
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"huangfu.#");

        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

0 人点赞