RabbitMQ 快速入门实战

2020-07-20 10:07:27 浏览数 (1)

本文基于最新rabbitmq:3.8.5版本,实现了direct、fanout、topic等几种主要消息模式,并基于spring-amqp完整实现了常见消息案例,同时也通过插件方式,实现了延迟消息的处理,帮您快速入门Rabbit消息处理。

内容概括

  • rabbitmq相关环境及插件的安装
  • springboot应用中work、pubish/subscribe、routing、topics、rpc、publisher confirm等模式示例
  • 纯java应用中work、publisher confirm模式的示例
  • 延迟消息队列示例

基础环境搭建

本文基于docker来安装RabbitMQ,通过pull当前最新版本rabbitmq:3.8.5-management即可,之后通过如下的命令即可运行:

代码语言:txt复制
docker run -d --hostname rabbit-test --name rabbit-test -p 5672:5672 -p 15672:15672 rabbitmq:3.8.5-management

同时,如有需要,也可以通过-e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password来指定相关的默认用户名称和密码。

另外,因为后续会需要实现一个延迟消息的示例(例如常见的网购7天后自动确认收货等),需要用到rabbitmq-delayed-message-exchange插件,具体安装过程如下:

代码语言:txt复制
# 在官方页面下载插件后,拷贝到容器中
docker cp E:dev2trybackendshiboototherrabbitmqrabbitmq_delayed_message_exchange-3.8.0.ez rabbit-test:/plugins
# 进入容器类
docker exec -it rabbit-test /bin/sh
# enable插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 确认插件安装成功
rabbitmq-plugins list

SpringBoot-AMQP应用模式

work普通工作模式

该模式通过构建1个queue、2个receiver、1个sender来模拟:

  • WorkSender 是每秒钟发送消息到队列中。
  • WorkReceiver 接收并打印消息内容。
  • WorkMqConfig 配置相关队列、接受者和发送者。
  • 其他基于默认配置进行处理,默认的exchange是direct模式,两个receiver相互竞争模式获取消息。

配置类具体如下:

代码语言:txt复制
@Configuration
public class WorkMqConfig {

    @Bean
    public Queue workQueue() {
        return new Queue("rabbit-work");
    }

    @Bean
    public WorkReceiver WorkReceiver1() {
        return new WorkReceiver(1);
    }

    @Bean
    public WorkReceiver workReceiver2() {
        return new WorkReceiver(2);
    }

    @Bean
    public WorkSender sender() {
        return new WorkSender();
    }

}

消息发送类如下:

代码语言:txt复制
@Slf4j
public class WorkSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    AtomicInteger dots = new AtomicInteger(0);

    AtomicInteger msgCount = new AtomicInteger(0);

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        // 每秒发送消息
        StringBuilder msgBuilder = new StringBuilder("hi");
        if (dots.getAndIncrement() == 4) {
            // 达到4次则重置
            dots.set(0);
        }
        for (int i = 0; i < dots.get(); i  ) {
            msgBuilder.append(".");
        }
        msgBuilder.append(msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(queue.getName(), msgBuilder.toString());
        log.info("send msg:{}", msgBuilder);
    }
}

消息接受者(消费者)具体代码如下:

代码语言:txt复制
@RabbitListener(queues = "rabbit-work")
@Slf4j
public class WorkReceiver {
    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String msg) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("worker[{}] received msg:{}", instance, msg);
        dealMsg(msg);
        watch.stop();
        log.info("worker[{}] done in {}s", instance, watch.getTotalTimeSeconds());
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(两个worker相关交替消费)

代码语言:txt复制
worker[1] received msg:hi.1
worker[2] received msg:hi..2
worker[1] received msg:hi...3
worker[2] received msg:hi....4

publish/subscribe 发布/订阅(广播)模式

该模式基于FanoutExchange来实现:

  • 构建一个FanoutExchange,实现对内容的路由
  • 建立两个AnonymousQueue(匿名自动删除)队列,分别绑定到上述exchange,实现对消息的订阅
  • 两个消费者从对应队列获取消息
  • 此模式与 work模式的区别是,此处两个消费者都收到了相同的消息。

相关具体配置如下:

代码语言:txt复制
@Configuration
public class PubSubConfig {

    private static final String EXCHANGE_NAME = "sample.fanout";

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue autoDeleteQueue1() {
        // 匿名自动删除队列
        return new AnonymousQueue();
    }

    @Bean
    public Queue autoDeleteQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchange());
    }

    @Bean
    public PubSubSender sender() {
        return new PubSubSender();
    }

    @Bean
    public PubSubReceiver receiver() {
        return new PubSubReceiver();
    }

}

消息发送模式类似:

代码语言:txt复制
@Slf4j
public class PubSubSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private FanoutExchange fanoutExchange;

    AtomicInteger dots = new AtomicInteger();

    AtomicInteger msgCount = new AtomicInteger();

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder msgBuilder = new StringBuilder("hello");
        if (dots.getAndIncrement() == 4) {
            dots.set(0);
        }
        for (int i = 0; i < dots.get(); i  ) {
            msgBuilder.append(".");
        }
        msgBuilder.append(msgCount.incrementAndGet());
        // 把消息发送到 exchange,注意此处的写法与 work模式有差异,也即需要指定exchange的name
        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", msgBuilder.toString());
        log.info("send msg:{}", msgBuilder);
    }
}

消息消费者从队列获取消息:

代码语言:txt复制
@Slf4j
public class PubSubReceiver {

    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receiver1(String msg) throws InterruptedException {
        receiveMsg(msg, 1);
    }

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receiver2(String msg) throws InterruptedException {
        receiveMsg(msg, 2);
    }

    public void receiveMsg(String msg, int receiver) throws InterruptedException {
        log.info("receiver[{}] receiver msg:{}", receiver, msg);
        dealMsg(msg);
        log.info("receiver[{}] done", receiver);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(两个receiver相关均获取到消息)

代码语言:txt复制
receiver[1] receiver msg:hello.1
receiver[2] receiver msg:hello.1
receiver[1] receiver msg:hello..2
receiver[2] receiver msg:hello..2

routing路由模式

路由模式则更灵活,可自动根据规则将消息投递到对应的队列中:

  • 路由模式的exchange与work一样,都是direct模式。
  • 通过在banding的时候,指定routingkey来实现消息路由,并且一个队列可绑定多个路由模式。
  • 注意此时的routingkey不支持模糊匹配

下面以日志处理示例内容:

  • 将error类消息路由到criticalQueue
  • 将info、warn类消息路由到normalQueue
代码语言:txt复制
@Configuration
public class MqRouteConfig {

    @Bean
    public DirectExchange logExchange() {
        return new DirectExchange("sample.direct.log");
    }

    @Bean
    public Queue criticalQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue normalQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding bindingError() {
        // 把错误日志绑定到criticalQueue
        return BindingBuilder.bind(criticalQueue()).to(logExchange()).with("error");
    }

    @Bean
    public Binding bindingInfo() {
        // 把info日志绑定到normalQueue
        return BindingBuilder.bind(normalQueue()).to(logExchange()).with("info");
    }

    @Bean
    public Binding bindingWarn() {
        // 把告警日志绑定到normalQueue
        return BindingBuilder.bind(normalQueue()).to(logExchange()).with("warn");
    }

    @Bean
    public LogSender logSender() {
        return new LogSender();
    }

    @Bean
    public LogReceiver logReceiver() {
        return new LogReceiver();
    }

}

消息发送者代码如下:

代码语言:txt复制
@Slf4j
public class LogSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private DirectExchange logExchange;

    AtomicInteger index = new AtomicInteger(0);

    AtomicInteger msgCount = new AtomicInteger(0);

    private final String[] types = {"error", "warn", "info"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder msgBuilder = new StringBuilder("log of ");
        if (this.index.incrementAndGet() == types.length) {
            this.index.set(0);
        }
        String type = types[this.index.get()];
        msgBuilder.append(type).append(" ").append(this.msgCount.incrementAndGet());
        for (int i = 0; i < index.get(); i  ) {
            msgBuilder.append(".");
        }
        rabbitTemplate.convertAndSend(logExchange.getName(), type, msgBuilder.toString());
        log.info("send msg:{}", msgBuilder.toString());
    }

}

消息接受者功能如下:

  • 分别注册三个RabbitListener,其中criticalLogger对应criticalQueue,normalLogger、otherLogger对应normalQueue
  • criticalLogger消费error消息
  • normalLogger、otherLogger则类似于work模式,均从normalQueue竞争模式获取消息。
代码语言:txt复制
@Slf4j
public class LogReceiver {

    @RabbitListener(queues = "#{criticalQueue.name}")
    public void criticalLogger(String msg) throws InterruptedException {
        receive(msg, "critical");
    }

    @RabbitListener(queues = "#{normalQueue.name}")
    public void normalLogger(String msg) throws InterruptedException {
        receive(msg, "normal");
    }

    @RabbitListener(queues = "#{normalQueue.name}")
    public void otherLogger(String msg) throws InterruptedException {
        receive(msg, "other");
    }

    public void receive(String msg, String type) throws InterruptedException {
        log.info("logger[{}] get msg:{}", type, msg);
        this.dealMsg(msg);
        log.info("logger[{}] done.", type);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(critical队列的消息都是error,normal和other则分别交替获取到warn和info消息)

代码语言:txt复制
logger[normal] get msg:log of warn 1.
logger[other] get msg:log of info 2..
logger[critical] get msg:log of error 3
logger[normal] get msg:log of warn 4.
logger[other] get msg:log of info 5..
logger[critical] get msg:log of error 6

topic主题匹配模式

topic模式是最灵活的模式:

  • 其exchange是三种类型中的最后一类:TopicExchange
  • 与direct和fanout的区别是,topic支持按*#来模糊匹配,其中*号代表一个单词,#代表0或多个单词(word)

下面以通常的短信和email通知示例:

  • *.reg路由消息代表用户注册,同时发送到emailQueue和smsQueue
  • #.password路由消息代表用户密码变动,仅发送到emailQueue
  • #.captcha路由消息代表验证码内容,仅发送到smsQueue

具体配置信息如下:

代码语言:txt复制
@Configuration
public class TopicMsgConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("sample.topic.user");
    }

    @Bean
    public TopicMsgReceiver receiver() {
        return new TopicMsgReceiver();
    }

    @Bean
    public Queue emailQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue smsQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding bindingEmailReg() {
        // 注册消息都发邮件
        return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("*.reg");
    }

    @Bean
    public Binding bindingEmailPassword() {
        // 密码信息都发邮件
        return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("#.password");
    }

    @Bean
    public Binding bindingSmsReg() {
        // 用户注册都发短信
        return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("*.reg");
    }

    @Bean
    public Binding bindingSmsCaptcha() {
        // 验证码均发短信
        return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("#.captcha");
    }

    @Bean
    public TopicMsgSender msgSender() {
        return new TopicMsgSender();
    }

}

消息发送示例如下:

代码语言:txt复制
@Slf4j
public class TopicMsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange topicExchange;

    AtomicInteger index = new AtomicInteger();

    AtomicInteger msgCount = new AtomicInteger();

    private final String[] msgKeys = {"user.reg", "user.update.password", "user.reg.captcha"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendMsg() {
        StringBuilder msgBuilder = new StringBuilder("用户信息变更 ");
        if (this.index.incrementAndGet() == msgKeys.length) {
            this.index.set(0);
        }
        String key = msgKeys[this.index.get()];
        msgBuilder.append(key).append(msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(topicExchange.getName(), key, msgBuilder.toString());
        log.info("发出消息:{}", msgBuilder);
    }
}

消息接收代码如下:

  • 注册两个RabbitListener,一个对接emailQueue,另一个对接smsQueue
代码语言:txt复制
@Slf4j
public class TopicMsgReceiver {

    @RabbitListener(queues = "#{emailQueue.name}")
    public void emailReceiver(String msg) throws InterruptedException {
        receive(msg, "email");
    }

    @RabbitListener(queues = "#{smsQueue.name}")
    public void SmsReceiver(String msg) throws InterruptedException {
        receive(msg, "sms");
    }

    public void receive(String msg, String inst) throws InterruptedException {
        log.info("instance[{}] received msg:{}", inst, msg);
        dealMsg(msg);
        log.info("instance[{}] done", inst);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:

  • email和sms两个实例都获取到了user.reg消息
  • email单独获取到了user.update.password消息
  • sms单独获取到了user.reg.captcha消息
代码语言:txt复制
instance[email] received msg:用户信息变更 user.update.password1
instance[email] received msg:用户信息变更 user.reg3
instance[email] received msg:用户信息变更 user.update.password4
instance[email] received msg:用户信息变更 user.reg6

instance[sms] received msg:用户信息变更 user.reg.captcha2
instance[sms] received msg:用户信息变更 user.reg3
instance[sms] received msg:用户信息变更 user.reg.captcha5
instance[sms] received msg:用户信息变更 user.reg6

rpc远程过程调用模式

rpc涉及到客户端和服务端交互:

  • exchange依然是基于DirectExchange模式
  • 此处的RPC模式与routing模式的主要区别是,RPC模式下,发送消息是基于convertSendAndReceive方法,而其他模式一般是基于convertAndSend方法
  • 另外,除了常规的基于RabbitTemplate来实现同步模式外,也可以通过AsyncRabbitTemplate来实现异步RPC,也即不需要等待上一条消息的返回,通过回调来接收消息响应信息。

配置类具体代码如下:

代码语言:txt复制
@Configuration
public class RpcMsgConfig {

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("sample.rpc");
    }

    @Bean
    public RpcMsgClient client() {
        return new RpcMsgClient();
    }

    @Bean
    public Queue queue() {
        return new Queue("sample.rpc.requests");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("rpc");
    }

    @Bean
    public RpcMsgServer server() {
        return new RpcMsgServer();
    }

    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
        return new AsyncRabbitTemplate(rabbitTemplate);
    }
}

服务端实现一个fibnacci数列求和,输入是n,返回的是求和结果

代码语言:txt复制
@Slf4j
public class RpcMsgServer {

    @RabbitListener(queues = "sample.rpc.requests")
    public int fibnacci(int n) {
        log.info("server received fib of :{}", n);
        int result = fib(n);
        log.info("server returned result:{}", result);
        return result;
    }

    public int fib(int n) {
        return n == 0 ? 0 : (n == 1 ? 1 : (fib(n - 1)   fib(n - 2)));
    }
}

客户端同步模式代码如下:

代码语言:txt复制
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendMsg() {
    // 此处为默认同步队列方式发送消息,也即上一消息未收到回复之前,不会发送下一条消息。
    // 默认的 超时时间是5秒,可通过setReplyTimeout来修改
    rabbitTemplate.setReplyTimeout(6000L);

    int fib = start;
    log.info("sync client send requesting fib({})", fib);
    Integer response = (Integer) rabbitTemplate.convertSendAndReceive(exchange.getName(), "rpc",
            fib);
    // 超时之后会得到null的返回值
    log.info("sync client got fib({}) response:{}", fib, response);
    start  ;
}

同步模式运行效果:

  • 在获取到上一个结果之后,才会发出小一条消息
  • 在计算到46时,客户端就因为超时而无法获取到结果,并且抛出了相关异常
代码语言:txt复制
sync client send requesting fib(45)
sync client got fib(45) response:1134903170
sync client send requesting fib(46)
sync client got fib(46) response:null
sync client send requesting fib(47)
sync client got fib(47) response:null

客户端异常回调代码如下:

代码语言:txt复制
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void asyncSendMsg() {
    int fib = start;
    log.info("async client send fib({})", fib);
    // 异步发送请求
    AsyncRabbitTemplate.RabbitConverterFuture<Integer> future = asyncRabbitTemplate.
            convertSendAndReceive(exchange.getName(), "rpc", fib);
    // 增加回调
    future.addCallback(new ListenableFutureCallback<Integer>() {
        @Override
        public void onFailure(Throwable throwable) {
            log.warn("async client failed", throwable);
        }

        @Override
        public void onSuccess(Integer integer) {
            log.info("async client got fib({}) reponse:{}", fib, integer);
        }
    });
    start  ;
}

异常模式运行效果如下:

  • 消息的发送没有被阻塞
  • 异步获取到了相关返回结果
代码语言:txt复制
async client send fib(44)
async client got fib(43) reponse:433494437
async client send fib(45)
async client got fib(43) reponse:433494437
async client send fib(46)
async client send fib(47)
async client send fib(48)
async client got fib(45) reponse:1134903170
async client send fib(49)
async client send fib(50)

publisher confirm发布者确认模式

官方示例中并没有给出publisher confirm的实现模式,以下示例供参考:

  • 通过在application配置文件中,设定publisher-confirm-typepublisher-returns来设定发布者的确认回调和 返回回调模式。
  • 通常是,如果消息正常发布到了exchange则算是自动确认(ack),如果因为routingkey错误等导致无法被正常路由,如果publisher-returns没有设置为true,则一般会被自动删除,否则会触发return回调。
  • 可通过listener.simple.acknowledge-mode来设置消费者的确认模式,默认是自动,可设置为manual来手工确认。【注意如果设置为manual,如果客户端因为异常等原因没有触发basicAck或basicNack等操作,该消息在消息队列中处于Ready状态,但对于消息发送方来说,依然是属于已确认状态,因为消息发送方的确认是指消息被成功投递到exchange broker】
  • 可通过listener.simple.prefetch来进行消费端限流,尤其是在消费端涉及到数据库操作等情况下。

示例application.yml配置如下:

代码语言:txt复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: simple
    publisher-returns: true
    listener:
      simple:
        #        手工确认,通常不需要
        acknowledge-mode: manual
        #        限流
        prefetch: 30

配置类代码:

代码语言:txt复制
@Configuration
public class MsgConfig {

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("sample.confirm.exchange");
    }

    @Bean
    public Queue queue() {
        return new Queue("sample.confirm.queue");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("#.confirm");
    }

    @Bean
    public Snowflake snowflake() {
        return IdUtil.createSnowflake(1, 1);
    }
}

消息发送者代码如下:

代码语言:txt复制
@Component
@Slf4j
public class MsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange exchange;

    @Autowired
    private Snowflake snowflake;

    private static AtomicInteger msgCount = new AtomicInteger();

    private static String[] keys = {"test.confirm", "good.confirm", "error.conf"};

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void sendMsg() {
        rabbitTemplate.setConfirmCallback((
                (correlationData, ack, cause) -> {
                    // 监听 broker的应答
                    log.info("sender confirm callback:{},{},{}", correlationData, ack, cause);
                    if (!ack) {
                        log.warn("sender 消息未确认");
                    }
                }));

        // 设置强制标识,必须设置了setReturnCallback,true是指broker不自动删除不可达消息,并通过ReturnCallback回调
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((
                (message, replyCode, replyText, exchange, routingKey) -> {
                    // 监听 不可路由的消息
                    log.info("sender return callback:{},{},{},{},{}", message, replyCode, replyText,
                            exchange, routingKey);
                }));

        CorrelationData data = new CorrelationData(snowflake.nextIdStr());
        // 有意根据数组发送无法路由消息
        String key = keys[msgCount.get() % 3];
        DemoUser user = DemoUser.builder()
                .userId(msgCount.incrementAndGet())
                .userName("用户"   msgCount.get())
                .build();
        String msg = JSON.toJSONString(user);
        MessageProperties messageProperties = new MessageProperties();
        // 设置内容为json模式
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        // 设置3秒过期,过期的被自动删除
        messageProperties.setExpiration("3000");
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(exchange.getName(), key, message, data);
    }
}

消息消费者代码如下:

  • 注意如果basicNack中指定requeue为true,该消息会再次回到消息队列头部,从而很容易造成消息消费的死循环。
代码语言:txt复制
@Component
@Slf4j
public class MsgConsumer {

    @RabbitListener(queues = "#{queue.name}")
    @RabbitHandler
    public void dealMsg(Message message, Channel channel) throws IOException, InterruptedException {
        String msg = new String(message.getBody());
        log.info("consumer got msg:{}", msg);
        DemoUser user = JSON.parseObject(msg, DemoUser.class);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        if (user.getUserId() % 10 == 0) {
            // 有意造成失败,也会返回失败确认,也即 ack,nack,reject都是确认
            // requeue 如果设置为true,很可能会造成消息消费的死循环
            channel.basicNack(deliveryTag, true, false);
            log.info("consumer msg nack:{},{}", deliveryTag, user);
        } else {
            channel.basicAck(deliveryTag, true);
        }
        // 有意减慢处理速度
        Thread.sleep(2000);
    }
}

运行效果:

  • sender每个成功投递的消息,都收到了confirm callback
  • sender路由错误的消息,都收到了return callback
代码语言:txt复制
sender confirm callback:CorrelationData [id=1284487848413761536],true,null
sender confirm callback:CorrelationData [id=1284487852880695296],true,null
sender return callback:(Body:'{"userId":3,"userName":"用户3"}' MessageProperties [headers={spring_returned_message_correlation=1284487857083387904}, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=3000, priority=0, deliveryTag=0]),312,NO_ROUTE,sample.confirm.exchange,error.conf

consumer got msg:{"userId":1,"userName":"用户1"}
consumer got msg:{"userId":2,"userName":"用户2"}
consumer got msg:{"userId":4,"userName":"用户4"}
consumer got msg:{"userId":5,"userName":"用户5"}
consumer got msg:{"userId":7,"userName":"用户7"}
consumer got msg:{"userId":8,"userName":"用户8"}
consumer got msg:{"userId":10,"userName":"用户10"}
consumer msg nack:7,DemoUser(userId=10, userName=用户10)

Java应用模式

简单work模式

springaqmp自动实现了相关依赖配置,纯java应用则需要自己来进行调用:

  • 通过 ConnectionFactory 来配置连接信息,创建连接connection,并通过connection来创建channel
  • 通过channel.queueDeclare来声明队列信息
  • 通过channel.basicPublish等方法来发送消息

消息发送端代码示例:

代码语言:txt复制
@Slf4j
public class PureSender {
    private final static String QUEUE_NAME = "sample.java";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "hello,你好";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            log.info("client send:{}", msg);
        }
    }
}

消息消费端示例:

  • 在消费端,注意不要用try语法来自动关闭连接,否则就只能运行一次。
代码语言:txt复制
@Slf4j
public class PureReceiver {
    private final static String QUEUE_NAME = "sample.java";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        log.info("server waiting for msg");
        channel.basicConsume(QUEUE_NAME, true, ((s, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            log.info("server received msg:{}", msg);
        }), s -> {
        });
    }
}

运行效果:

  • 主要要先运行PureReceiver,等待消息发来,然后再运行PureSender,向队列发送消息。
  • 客户端成功发出消息,服务端也成功接收到了消息
代码语言:txt复制
client send:hello,你好

server waiting for msg
server received msg:hello,你好

publisher confirm发布者确认

rabbitmq官网提供了java版本的模式实现:

  • 调用channel的confirmSelect,将模式设定为开启发布者确认。
  • pubMsgIndividually是最简单模式,直接发送消息后通过waitForConfirmsOrDie来等待确认,也是性能最差的一种方式
  • pubMsgInBatch是批量模式,逐条发送消息后,通过计数器来进行批次确认等待确认
  • handleMsgAsync是异步模式,也是性能最高的一种模式,通过channel的addConfirmListener来进行确认结果的监听

消息发送方:

1、创建连接:

代码语言:txt复制
static Connection createConnection() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("guest");
    factory.setPassword("guest");
    return factory.newConnection();
}

2、逐条发送模式:

代码语言:txt复制
static void pubMsgIndividually() throws IOException, TimeoutException, InterruptedException {
    // 单独发送消息模式
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i  ) {
            String msg = String.valueOf(i);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // 同步等待消息确认结果,5秒后超时
            channel.waitForConfirmsOrDie(5_000);
        }
        long end = System.nanoTime();
        // 打印发出的消息数目,以及用时

        log.info("sended {} msgs individually in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

3、批量模式:

代码语言:txt复制
static void pubMsgInBatch() throws IOException, TimeoutException, InterruptedException {
    // 批量发送消息
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        int batchSize = 100;
        int outstandingMsgCount = 0;

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i  ) {
            String msg = String.valueOf(i);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            outstandingMsgCount  ;

            if (outstandingMsgCount == batchSize) {
                // 按100条的批次等待消息确认
                channel.waitForConfirmsOrDie(5_000);
                outstandingMsgCount = 0;
            }
        }

        if (outstandingMsgCount > 0) {
            channel.waitForConfirmsOrDie(5_000);
        }

        long end = System.nanoTime();
        log.info("sended {} msgs in bath in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

4、异步模式:

  • 构建ConcurrentSkipListMap来存储待处理消息
  • 通过ackCallback和nackCallback,监听消息确认情况,确认的消息将被清除
代码语言:txt复制
static void handleMsgAsync() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        // ConcurrentSkipListMap是线程安全的高并发有序哈希map
        // key为消息序号,string为消息内容
        // 用该map来记录待处理的消息
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            // 消息被确认的回调
            if (multiple) {
                // 批量确认,小于等于deliveryTag的都会被确认
                // headMap返回的是小于等于给定值的map子集
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                        deliveryTag, true);
                // 清空所有确认的消息
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
        };

        channel.addConfirmListener(cleanOutstandingConfirms, (
                (deliveryTag, multiple) -> {
                    // 消息丢失的回调
                    String msg = outstandingConfirms.get(deliveryTag);
                    log.error("msg with body {} nack-ed,deliveryTag:{},multiple:{}", msg,
                            deliveryTag, multiple);
                    // 处理丢失的消息
                    cleanOutstandingConfirms.handle(deliveryTag, multiple);
                }));

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i  ) {
            String msg = String.valueOf(i);
            outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
            throw new IllegalStateException("60秒内无法确认所有消息");
        }

        long end = System.nanoTime();
        log.info("sended {} msgs in handleMsgAsync in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

消息接收端代码如下:

代码语言:txt复制
@Slf4j
public class MsgReceiver {

    private final static String QUEUE_NAME = "sample.pubConf";

    static AtomicInteger msgCount = new AtomicInteger(0);

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = PublisherConfirms.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        log.info("server waiting for msg");
        channel.basicConsume(QUEUE_NAME, true, (
                (c, delivery) -> {
                    String msg = new String(delivery.getBody(), "UTF-8");
                    // 对接收到的消息进行累加
                    if (msgCount.incrementAndGet() % 20000 == 0) {
                        // 每2万条消息打印一次
                        log.info("server got {} messages", msgCount);
                        log.info("current msg:{}", msg);
                    }
                }), s -> {
            log.warn("consumer canceled:{}", s);
        });
    }
}

运行效果:

  • 客户端分三批次发出了2万条消息,第一次用时14秒左右,第二次用时1.6秒,第三次用时1.1秒
  • 服务端总共收到了6万条消息
代码语言:txt复制
 sended 20000 msgs individually in 14609 ms
 sended 20000 msgs in bath in 1655 ms
 sended 20000 msgs in handleMsgAsync in 1176 ms
 
 server got 20000 messages
 current msg:19999
 server got 40000 messages
 current msg:19999
 server got 60000 messages
 current msg:19999

延迟消息模式

开始延迟消息模式之前,需要先安装rabbitmq_delayed_message_exchange插件,然后:

  • 通过exchange.setDelayed(true);来将broker设置为延迟模式
  • 发送消息时,通过properties.setDelay(3_000);来设定每条消息的延迟时间,单位毫秒

消息配置类:

代码语言:txt复制
@Configuration
public class DelayMQConfig {
    @Bean
    public TopicExchange exchange() {
        TopicExchange exchange = new TopicExchange("sample.delay");
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue queue() {
        return new Queue("queue.delay");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("#.delay");
    }
}

消息发送类:

代码语言:txt复制
@Component
@Slf4j
public class MsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange exchange;

    private static AtomicInteger msgCount = new AtomicInteger();

    @Scheduled(fixedDelay = 2000, initialDelay = 500)
    public void sendMsg() {
        // 循环发送3条消息
        String msg = String.format("你好,msg%s", msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(exchange.getName(), "msg.delay", msg, (message -> {
            MessageProperties properties = message.getMessageProperties();
            // 设置延迟3秒
            properties.setDelay(3_000);
            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }));
        log.info("send msg:{}", msg);
    }
}

消息消费类:

代码语言:txt复制
@Component
@Slf4j
public class MsgConsumer {

    @RabbitListener(queues = "#{queue.name}")
    @RabbitHandler
    public void dealMsg(Message message) {
        log.info("consumer got msg:{}", new String(message.getBody()));
    }
}

运行效果:

  • 在消息发出3秒钟以后,消费方才接收到消息
代码语言:txt复制
2020-07-18 22:37:44.851: send msg:你好,msg1
2020-07-18 22:37:46.851: send msg:你好,msg2
2020-07-18 22:37:47.869: consumer got msg:你好,msg1
2020-07-18 22:37:48.854: send msg:你好,msg3
2020-07-18 22:37:49.856: consumer got msg:你好,msg2
2020-07-18 22:37:50.855: send msg:你好,msg4
2020-07-18 22:37:51.861: consumer got msg:你好,msg3
2020-07-18 22:37:52.856: send msg:你好,msg5

源码信息

本案例源码地址:https://gitee.com/coolpine/backends/tree/master/hiboot/src/main/java/pers/techlmm/rabbit

更多参考资料:

  • springamqp官方资料 https://spring.io/projects/spring-amqp
  • rabbitmq官方资料 https://www.rabbitmq.com/getstarted.html
  • rabbitmq社区插件 https://www.rabbitmq.com/community-plugins.html
  • 延迟消息插件详情 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0rabbitmq
  • 延迟消息github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  • springamqp api文档 https://docs.spring.io/spring-amqp/api/

0 人点赞