RabbitMQ原生Java使用

2023-08-10 11:34:59 浏览数 (1)

前言:

看此文章前,请先观看:https://cloud.tencent.com/developer/article/2311139

介绍

RabbitMQ和WebSocket技术相似,不过目前WebSocket似乎只用于即时通讯功能上,而RabbitMQ不仅可以用于即时通讯,还有更高级的玩法

本文章只讲解,使用几种交换机来发送信息

RabbitMQ分为发送消息端和接受消息端

整个工作流程:

发送端:创建连接-创建通道-声明队列-发送消息

接受端:创建连接-创建通道-声明队列-监听队列-接收消息-ack回复

交换模式

RabbitMQ共有6个交换机模式,其实有7种,有一种就是不用交换机,系统会默认使用DIRECT交换机模式,本章不讲解5和6模式,常用模式有Topics

1、Work queues

2、Publish/Subscribe

3、Routing

4、Topics(常用)

5、Header

6、RPC

准备工作

依赖

代码语言:javascript复制
<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.3</version>
</dependency>

工具类-MyConnection用于连接到你机器上的RabbitMQ

代码语言:javascript复制
package com.zb.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MyConnection {

    public Connection getConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory.newConnection();
    }
}

一.玩法使用

启动顺序都是,先启监听端(接受端),后发送端

1.默认模式(DIRECT)-单个信息

说明:默认模式仅仅只是发送单个消息

实际业务:

排队样子像前台用户一个个发信息

发送端:

代码语言:javascript复制
package com.zb.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;


public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");
        //创建连接
        MyConnection myConnection = new MyConnection();
        //获取连接
        Connection connection = myConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("hello_queue", true, false, false, null);

       //设置要发送的数据
        String msg = "这是一个简单模式,没有写交换机代码,使用系统默认交换机";



        //启动交换机,发送数据 (简单模式使用默认交换机,所以参1无需填写)
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        channel.basicPublish("", "hello_queue", null, msg.getBytes());
        System.out.println("发送完毕");




    }
}

接受端:

代码语言:javascript复制
package com.zb.hello;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;
import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        System.out.println("开启监听...");
        //创建连接
        MyConnection myConnection = new MyConnection();
        //获取连接
        Connection connection = myConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();


        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("hello_queue",true,defaultConsumer);



    }
}

2.轮询模式(Work queues)- 多个消息(DIRECT)

说明:当一次发送10个消息时,有2个接受端,系统会自动使用轮询模式

轮询:你1-我2-你3-我4-你5-我6…

实际业务:

将一个消息发给前台用户,轮询的模式发送

发送端:

代码语言:javascript复制
package com.zb.woker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

import java.util.Date;

public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();


        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("worker_queue", true, false, false, null);


        //循环发送10个数据
        for (int i = 0; i < 10; i  ) {
            String msg = "hello第:"   i;
            //发送
            channel.basicPublish("", "worker_queue", null, msg.getBytes());
        }

        System.out.println("发送完毕");

    }
}

1号接受端:

代码语言:javascript复制
package com.zb.woker;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听1");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("worker_queue",true,defaultConsumer);



    }
}

2号接受端:

代码和1号接受端一致

3.固定模式(Routing)-多个消息(DIRECT)

说明:发送多个消息,但指定只给小明和小美发

固定:小明和小美都可以接受到信息,其它人无法接受到

实际业务:

选择性只给VIP大客户用户发信息

发送端:

代码语言:javascript复制
package com.zb.routings;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //指定交换机,这里使用:模糊名字发送交换机(TOPIC)
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);


        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("direct_email", true, false, false, null);
        channel.queueDeclare("direct_sms", true, false, false, null);


        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        channel.queueBind("direct_email", "routing_exchange", "email_routing");
        channel.queueBind("direct_sms", "routing_exchange", "sms_routing");

        //设置要发送的数据
        String strEmail = "123@qq.com";
        String strPhone = "13111111111";


        //启动交换机,发送数据
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        channel.basicPublish("routing_exchange", "email_routing", null, strEmail.getBytes());
        channel.basicPublish("routing_exchange", "sms_routing", null, strPhone.getBytes());


    }
}

1号接受端

代码语言:javascript复制
package com.zb.routings;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class EmailConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听...");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("direct_email",true,defaultConsumer);



    }
}

2号接受端:

只需要改动1号接受端的最后一行

改为:

代码语言:javascript复制
channel.basicConsume("direct_sms",true,defaultConsumer);

4.订阅模式/广播模式(Publish/Subscribe)-多个消息(FANOUT)

说明:发送多个消息,类似短视频里面,你关注它,它发视频,你是它的粉丝,它会主动通知你我发新作品了

实际业务:

平台邀请用户玩新东西,用户同意了,就接受平台发的信息

FANOUT交换机:

优点:它转发消息最快

缺点:丢失消息,因为是广播的,一旦错过,那么就永久丢失了,就想看新闻联播一样,错过了7点,那么就只能看重播了,但是不会重播

勇敢一点,珍惜眼前人

发送端:

代码语言:javascript复制
package com.zb.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

public class Producer {
    public static void main(String[] args) throws Exception {
        //Publish/Subscribe工作模式,
        //发布订阅模式,又叫广播模式
        //类似短视频里面,你关注它,它发视频,你是它的粉丝,你可以看见

        //使用FANOUT交换机
        //优点:它转发消息最快
        //缺点:丢失消息,因为是广播的,一旦错过,那么就丢失了
        //就想看新闻联播一样,错过了7点,那么就只能看重播了,但是不会重播

        System.out.println("开始发送数据...");
        //创建连接
        MyConnection myConnection = new MyConnection();
        //获取连接
        Connection connection = myConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机,这里使用:FANOUT交换机模式
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);



        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        //------------------------------------------------------------------------------------
        //这里就比如:一个人把视频发到了快手和抖音两个平台
        channel.queueDeclare("fanout_kuaishou", true, false, false, null);
        channel.queueDeclare("fanout_douyin", true, false, false, null);



        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        //---------------------------------------------------
        //因为FANOUT交换机,是发布订阅/广播模式,所以参3无需起名字
        channel.queueBind("fanout_kuaishou", "fanout_exchange", "");
        channel.queueBind("fanout_douyin", "fanout_exchange", "");

        //设置要发送的数据
        String msg = "xxx发了一个新作品,快来看看吧";


        //启动交换机,发送数据
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        //------------------------------------------------------------------------------------------
        //因为FANOUT交换机,是发布订阅/广播模式,参2无需携带
        channel.basicPublish("fanout_exchange", "", null, msg.getBytes());
        System.out.println("发送成功");

    }
}

1号接受端:

代码语言:javascript复制
package com.zb.fanout;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class KuaiShouConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("开启监听...");
        //创建连接
        MyConnection myConnection = new MyConnection();
        //获取连接
        Connection connection = myConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("fanout_kuaishou",true,defaultConsumer);



    }
}

2号接受端:

只需要改动1号接受端的最后一行

改为:

代码语言:javascript复制
channel.basicConsume("fanout_douyin",true,defaultConsumer);

5.按名字模式(Topics)-多个消息(TOPIC)

说明:和其它不同,它是发送端使用默认交换机,接受端使用TOPIC交换机

按名字:接受端设置自已的名字,发送端可以选择模糊名字包含xx发送,或者全匹配发送

实际业务:

根据靓号ID用户发消息。比如用户ID包含666的精品ID,去给这类用户发消息

发送端:

代码语言:javascript复制
package com.zb.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");
        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //开启confirm的启动监听
        channel.confirmSelect();


        //启动交换机,发送数据
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        //-----------------------------------------------------------------------------------------
        //TOPIC交换机和其它的有所不同,此处发送端使用了默认交换机,所以不用指定任何交换机
        //公共意思是:看参2:向名字已info开头,后面包含 sms/email 的接受端发送信息
        String commonMSG = "公共数据";
        channel.basicPublish("topic_exchange", "info.sms.email", null, commonMSG.getBytes());
        System.out.println("发送成功");

        //私有意思是:看参2:向名字已info开头,后面包含 email 的接受端发送信息
        String emailmsg = "邮箱私有数据";
        channel.basicPublish("topic_exchange", "info.email", null, emailmsg.getBytes());
        System.out.println("发送成功");

        //私有意思是:看参2:向名字已info开头,后面包含 sms 的接受端发送信息
        String smsmsg = "短信私有数据";
        channel.basicPublish("topic_exchange", "info.sms", null, smsmsg.getBytes());
        System.out.println("发送成功");




    }
}

1号接受端:

代码语言:javascript复制
package com.zb.topic;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class EmailConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听...");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //指定交换机,这里使用:模糊名字发送交换机(TOPIC)
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        //-----------------------------------------------------------
        //TOPIC交换机和其它的有所不同,发送端使用了默认交换机,此处的接受端使用TOPIC交换机
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);


        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("email_topic", true, false, false, null);


        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        //-----------------------------------------------------------
        //TOPIC交换机和其它的有所不同,发送端使用了默认交换机,此处的接受端使用TOPIC交换机
        channel.queueBind("email_topic", "topic_exchange", "info.#.email.#");

        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("email_topic", true, defaultConsumer);


    }
}

2号接受端:

改动1号接受端的-----第31行的第1个参数和37行的第1个参数和第3个参数


二.高级玩法

限流:一大巴数据过来了,可以一次处理x个(x个x个处理)

回应:消息发到RabbitMQ了,如果它接受到,就回应我

接受回应:接受端接受到了,回应我

1.限流(TOPIC)

说明:代码里面设置了,限流2个一次,意思是2个2个处理

发送端:

代码语言:javascript复制
package com.zb.qos;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");
        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //for循环发送多个数据
        for (int i = 0; i < 10; i  ) {

            //设置要发送的数据
            String commonMSG = "限流数据"   i;

            //启动交换机,发送数据
            //参1:交换机名字
            //参2:携带那个绑定好的
            //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
            //参4:发送什么数据,必须byte数组类型
            channel.basicPublish("qos_exchange", "qos_routing", null, commonMSG.getBytes());
        }


    }
}

接受端:

代码语言:javascript复制
package com.zb.qos;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class EmailConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听...");

        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        final Channel channel = connection.createChannel();

        //指定交换机,这里使用:模糊名字发送交换机(TOPIC)
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        channel.exchangeDeclare("qos_exchange", BuiltinExchangeType.TOPIC);

        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("qos_queue", true, false, false, null);


        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        channel.queueBind("qos_queue", "qos_exchange", "qos_routing");

        //限流,一次处理2个(2个2个处理)
        channel.basicQos(2);

        //开始监听方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
                if (str.equals("限流数据5")) {
                    channel.basicNack(envelope.getDeliveryTag(), false, false);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //设置监听数据
        //参1:要监听那个队列的名字
        //参2:true=接收到后通知服务器接受到了,false=接收到消息不通知服务器 (该功能一般用于开启confirm的功能上)
        //参3:使用那个监听方法
        channel.basicConsume("qos_routing", false, defaultConsumer);


    }
}

2.回应(DIRECT)

说明:RabbitMQ接收到回应和没接收到回应,有一些参数可以写一些回滚数据,或者重新发送数据的方法,或者写进日志,方便运维人员查看

发送端:

代码语言:javascript复制
package com.zb.confirm;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Producer {
    //该方法使用简单模式的默认交换机(DIRECT)
    public static void main(String[] args) throws Exception {

        System.out.println("开始发送数据...");
        //创建连接
        MyConnection myConnection = new MyConnection();
        //获取连接
        Connection connection = myConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //开启confirm的启动监听
        channel.confirmSelect();

        //指定交换机,这里使用默认交换机(DIRECT)
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        channel.exchangeDeclare("confirm_exchange", BuiltinExchangeType.DIRECT);


        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("confirm_queue", true, false, false, null);

        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        channel.queueBind("confirm_queue", "confirm_exchange", "confirm_routing");

        //设置要发送的数据
        String strEmail = "confirm测试数据";

        //启动交换机,发送数据
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        channel.basicPublish("confirm_exchange", "confirm_routing", null, strEmail.getBytes());
        System.out.println("发送成功");


        //confirm的回应和没有回应信息区,不多解释
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("mq回应了信息");
            }

            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("mq没有回应信息");
                System.out.println("此处可能网络波动,导致没有成功,可以写一些回滚数据,或者重新发送数据的方法");
            }
        });


    }
}

3.接受回应(DIRECT)

说明:接受端接受到了,回应我,一些参数可以写进日志,方便运维人员查看

发送端:

代码语言:javascript复制
package com.zb.returns;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Producer {
    public static void main(String[] args) throws Exception {
        System.out.println("开始发送数据...");
        //创建连接,获取连接,创建通道
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        //开启confirm的启动监听
        channel.confirmSelect();


        //指定交换机,这里使用默认交换机(DIRECT)
        // 参1:给交换机起的名字
        // 参2:指定交换机模式
        channel.exchangeDeclare("return_exchange", BuiltinExchangeType.DIRECT);


        //声明队列
        //参1:起个名字
        //参2:是否持久化(true=不会丢失,数据放Erlang自带的Mnesia数据库,false=数据放内存,重启RabbitMQ数据丢置)正常业务设置:true
        //参3:是否排外的(true=仅第一个链接的通道使用,断开候会删除,false=所有通道都可以用)正常业务设置:false
        //参4:是否自动删除(true=当接受与队列断开后,会自动删除,false=不删除)正常业务设置:false
        //参5:设置消息的参数(x-rnessage-ttl等,基本用不上)正常业务设置:null
        channel.queueDeclare("return_queue", true, false, false, null);

        //设置绑定队列到交换机上(参1:队列名字。参2:交换机名。参3:起个名字)
        channel.queueBind("return_queue", "return_exchange", "error_routing");

        //设置要发送的数据
        String strEmail = "return测试数据";

        //启动交换机,发送数据
        //参1:交换机名字
        //参2:携带那个绑定好的
        //参3:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化,正常业务设置null
        //参4:发送什么数据,必须byte数组类型
        channel.basicPublish("return_exchange", "error_routing", true, null, strEmail.getBytes());
        System.out.println("发送成功");

        //接受发送完毕后的系统参数
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("形参里面有发送后的状态码,信息什么的");
                System.out.println("此处可以根据形参,写一些日志,方便运维人员处理问题");
            }
        });

    }
}

三.附加玩法

TTL:设置指定过期时间(10000=10秒),到达时间自动清除

DLX:队列上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上

TTL

发送端:

22行设置过期清除时间

代码语言:javascript复制
package com.zb.ttl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

import java.io.IOException;

public class Producer {
    public static void main(String[] args) throws Exception {
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.confirmSelect();
        String commonMSG = "时间数据";
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().
                builder().
                deliveryMode(2).
                contentEncoding("UTF-8").
                expiration("10000").
                build();
        channel.basicPublish("ttl_exchange", "info.ttl", basicProperties, commonMSG.getBytes());


    }
}

接受端:

代码语言:javascript复制
package com.zb.ttl;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;

public class EmailConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听");
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("ttl_exchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("ttl_topic", true, false, false, null);
        channel.queueBind("ttl_topic", "ttl_exchange", "info.ttl");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };

        //监听数据
        channel.basicConsume("ttl_topic", true, defaultConsumer);


    }
}

DLX

发送端:

19行设置过期转发时间

成为死信一般有以下几种情况:

消息被拒绝(basic.reject or basic.nack)且带requeue=false参数

消息的TTL-存活时间已经过期

队列长度限制被超越(队列满)

代码语言:javascript复制
package com.zb.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zb.util.MyConnection;

public class Producer {
    public static void main(String[] args) throws Exception {
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.confirmSelect();
        String commonMSG = "死信数据";
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().
                builder().
                deliveryMode(2).
                contentEncoding("UTF-8").
                expiration("10000").
                build();
        channel.basicPublish("one_exchange", "one_routing", basicProperties, commonMSG.getBytes());


    }
}

1号接受端:

代码语言:javascript复制
package com.zb.dlx;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class OneConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听1");
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> param = new HashMap<String, Object>();
        param.put("x-dead-letter-exchange", "two_exchange");

        channel.exchangeDeclare("one_exchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("one_queue", true, false, false, param);

        channel.queueBind("one_queue", "one_exchange", "one_routing");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };
        //监听数据
        channel.basicConsume("one_queue", true, defaultConsumer);


    }
}

2号接受端

代码语言:javascript复制
package com.zb.dlx;

import com.rabbitmq.client.*;
import com.zb.util.MyConnection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class TwoConsumer {
    public static void main(String[] args) throws Exception {
        System.out.println("启动监听2");
        MyConnection myConnection = new MyConnection();
        Connection connection = myConnection.getConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare("two_exchange", BuiltinExchangeType.TOPIC);
        channel.queueDeclare("two_queue", true, false, false, null);

        channel.queueBind("two_queue", "two_exchange", "#");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body, "UTF-8");
                System.out.println(str);
            }
        };
        //监听数据
        channel.basicConsume("two_queue", true, defaultConsumer);


    }
}

0 人点赞