RabbitMQ实战代码

2019-10-21 17:23:06 浏览数 (1)

Maven依赖

RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

maven工程的pom文件中添加依赖

代码语言:javascript复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

工具类

代码语言:javascript复制
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtils {
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("psvmc");
        factory.setPassword("psvmc");
        factory.setHost("mq.psvmc.cn");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        return conn;
    }
}

QUEUE

我这里暂且把当前的这种方式定义为队列模式

队列模式的特点

  • 先打开生产者发送消息消息不会丢失
  • 多个消费者不会收到同一个消息 由服务器去分配
  • 生产者把消息直接放在队列中 队列由生产者创建
  • 发布消息是交换机的名字填空字符串
  • RabbitMQ内置一个名称为空字符串的默认交换机,它根据Routing key将消息路由到与队列名与Routing key完全相等的队列中

消息生产者

代码语言:javascript复制
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {
    public final static String QUEUE_NAME = "rabbitMQ.work1";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //  声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        int message = 1;
        while (message < 1000) {
            //发送消息到队列中
            channel.basicPublish(
                    "",
                    QUEUE_NAME,
                    null,
                    (""   message).getBytes("UTF-8")
            );
            System.out.println("发送消息:"   message);
            Thread.sleep(2000);
            message  = 1;
        }

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消息消费者

代码语言:javascript复制
import com.rabbitmq.client.*;

import java.io.IOException;

public class MyCustomer {
    private final static String QUEUE_NAME = "rabbitMQ.work1";

    public static void main(String[] args) throws Exception {

        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        final Channel channel = connection.createChannel();
        //每次从队列获取的数量
        channel.basicQos(1);
        //声明要关注的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        //告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息:"   message);
                long deliveryTag = envelope.getDeliveryTag();
                //确认消息
                try {
                    Thread.sleep(3000);
                    channel.basicAck(deliveryTag, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

EXCHANGE

我这里暂且把当前的这种方式定义为路由模式

路由模式的特点

  • 先打开生产者发送消息,消息会丢失
  • 多个消费者会收到同一个消息 由服务器根据规则去分配
  • 需要队列和路由进行绑定
  • 队列可以多次和路由绑定 只要routingKey不同即可
  • 交换机类型:fanout(发布订阅模式),direct(精准匹配模式), topic(通配符模式), headers(头匹配模式)

fanout(发布订阅模式)

这种模式的特点

  • routingKey 为空字符串
  • 只要订阅后都能收到消息

消息生产者

代码语言:javascript复制
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {

    public final static String EXCHANGE_NAME = "myexchange";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //声明一个交换机 发布订阅模式
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        int message = 1;
        while (message < 1000) {
            //发送消息到队列中
            channel.basicPublish(
                    EXCHANGE_NAME,
                    "",
                    null,
                    (""   message).getBytes("UTF-8")
            );
            System.out.println("发送消息:"   message);
            Thread.sleep(2000);
            message  = 1;
        }

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消息消费者

代码语言:javascript复制
import com.rabbitmq.client.*;
import java.io.IOException;

public class MyCustomer {

    public final static String EXCHANGE_NAME = "myexchange";
    private final static String QUEUE_NAME = "rabbitMQ.queue1";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        final Channel channel = connection.createChannel();
        //每次从队列获取的数量
        channel.basicQos(1);
        //声明定义队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        //告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息:"   message);
                long deliveryTag = envelope.getDeliveryTag();
                //确认消息
                try {
                    Thread.sleep(3000);
                    channel.basicAck(deliveryTag, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

direct(精准匹配模式)

特点

  • 根据routingKey精准匹配消息

topic(通配符模式)

特点

  • 根据routingKey模糊匹配消息
  • routingKey为aa.bb形式
  • 可以用*#进行匹配 a.*可以匹配 a.a、a.b 不能匹配a.b.c a.#既可以匹配 a.a、a.b 也能匹配a.b.c

headers(头匹配模式)

  • x-match = all :表示所有的键值对都匹配才能接受到消息
  • x-match = any :表示只要有键值对匹配就能接受到消息

消费者

代码语言:javascript复制
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "any");
headers.put("name", "jack");
headers.put("age" , 31);

//把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);

生产者

代码语言:javascript复制
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);

//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 30);
Builder builder = new Builder();
builder.headers(headers);

channel.basicPublish(EXCHANGE_NAME, "", builder.build(), message.getBytes());

上面的例子中name的值都为jack 匹配上了一个 就能收到消息

Spring集成

这里的示例是用的QUEUE的方式

注意下面的这行配置

代码语言:javascript复制
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

一定要配置的是routing-key="myqueue" 不要配成queue="myqueue" 我就是在这里折腾了好久。

下面是具体的配置:

1) 添加依赖

代码语言:javascript复制
<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.2</version>
  </dependency>

  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.6.RELEASE</version>
  </dependency>

</dependencies>

2) 在resources中添加文件rabbitmq.xml 内容如下

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="connectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="mq.psvmc.cn"/>
        <property name="username" value="psvmc"/>
        <property name="password" value="psvmc"/>
        <property name="port" value="5672"/>
        <property name="channelCacheSize" value="5"/>
    </bean>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义queue -->
    <rabbit:queue name="myqueue" auto-declare="true" durable="false" auto-delete="false" exclusive="false"/>


    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

    <!-- 监听生产者发送的消息开始 -->
    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="cn.psvmc.spring.MessageConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <!-- acknowledeg = "manual":意为表示该消费者的ack方式为手动 ;acknowledge="auto"表示自动-->
    <!-- prefetch=1设置预取消息数目为1 -->
    <rabbit:listener-container
            prefetch="1"
            connection-factory="connectionFactory"
            auto-declare="true"
            acknowledge="manual">
        <rabbit:listener queue-names="myqueue" ref="messageReceiver" method="onMessage"/>
    </rabbit:listener-container>
    <!-- 监听生产者发送的消息结束 -->
</beans>

3) 消息的生产者

代码语言:javascript复制
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:rabbitmq.xml");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        int message = 1;
        while (message < 100) {
            //发送消息到队列中
            template.convertAndSend(""   message);
            System.out.println("  发送消息:"   message);
            Thread.sleep(2000);
            message  = 1;
        }
        ((ClassPathXmlApplicationContext) context).destroy();
    }
}

4) 消息的消费者

代码语言:javascript复制
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

public class MessageConsumer implements ChannelAwareMessageListener {

    /**
     * 处理收到的rabbit消息的回调方法。
     *
     * @param message AMQP封装消息对象
     * @param channel 信道对象,可以进行确认回复
     * @throws Exception Any.
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("- 收到消息:"   new String(message.getBody(), "UTF-8"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

0 人点赞