Maven依赖
RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
maven工程的pom文件中添加依赖
代码语言:javascript复制<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
工具类
代码语言:javascript复制import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtils {
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("psvmc");
factory.setPassword("psvmc");
factory.setHost("mq.psvmc.cn");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
return conn;
}
}
QUEUE
我这里暂且把当前的这种方式定义为队列模式
队列模式的特点
- 先打开生产者发送消息消息不会丢失
- 多个消费者不会收到同一个消息 由服务器去分配
- 生产者把消息直接放在队列中 队列由生产者创建
- 发布消息是交换机的名字填空字符串
- RabbitMQ内置一个名称为空字符串的默认交换机,它根据Routing key将消息路由到与队列名与Routing key完全相等的队列中
消息生产者
代码语言:javascript复制import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class MyProducer {
public final static String QUEUE_NAME = "rabbitMQ.work1";
public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int message = 1;
while (message < 1000) {
//发送消息到队列中
channel.basicPublish(
"",
QUEUE_NAME,
null,
("" message).getBytes("UTF-8")
);
System.out.println("发送消息:" message);
Thread.sleep(2000);
message = 1;
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者
代码语言:javascript复制import com.rabbitmq.client.*;
import java.io.IOException;
public class MyCustomer {
private final static String QUEUE_NAME = "rabbitMQ.work1";
public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//每次从队列获取的数量
channel.basicQos(1);
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
//告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息:" message);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
try {
Thread.sleep(3000);
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
EXCHANGE
我这里暂且把当前的这种方式定义为路由模式
路由模式的特点
- 先打开生产者发送消息,消息会丢失
- 多个消费者会收到同一个消息 由服务器根据规则去分配
- 需要队列和路由进行绑定
- 队列可以多次和路由绑定 只要routingKey不同即可
- 交换机类型:fanout(发布订阅模式),direct(精准匹配模式), topic(通配符模式), headers(头匹配模式)
fanout(发布订阅模式)
这种模式的特点
- routingKey 为空字符串
- 只要订阅后都能收到消息
消息生产者
代码语言:javascript复制import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class MyProducer {
public final static String EXCHANGE_NAME = "myexchange";
public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个交换机 发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
int message = 1;
while (message < 1000) {
//发送消息到队列中
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
("" message).getBytes("UTF-8")
);
System.out.println("发送消息:" message);
Thread.sleep(2000);
message = 1;
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者
代码语言:javascript复制import com.rabbitmq.client.*;
import java.io.IOException;
public class MyCustomer {
public final static String EXCHANGE_NAME = "myexchange";
private final static String QUEUE_NAME = "rabbitMQ.queue1";
public static void main(String[] args) throws Exception {
//创建一个新的连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
final Channel channel = connection.createChannel();
//每次从队列获取的数量
channel.basicQos(1);
//声明定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
//告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息:" message);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
try {
Thread.sleep(3000);
channel.basicAck(deliveryTag, false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
direct(精准匹配模式)
特点
- 根据routingKey精准匹配消息
topic(通配符模式)
特点
- 根据routingKey模糊匹配消息
- routingKey为aa.bb形式
- 可以用
*
和#
进行匹配 a.*可以匹配 a.a、a.b 不能匹配a.b.c a.#既可以匹配 a.a、a.b 也能匹配a.b.c
headers(头匹配模式)
- x-match = all :表示所有的键值对都匹配才能接受到消息
- x-match = any :表示只要有键值对匹配就能接受到消息
消费者
代码语言:javascript复制//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "any");
headers.put("name", "jack");
headers.put("age" , 31);
//把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
生产者
代码语言:javascript复制//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 30);
Builder builder = new Builder();
builder.headers(headers);
channel.basicPublish(EXCHANGE_NAME, "", builder.build(), message.getBytes());
上面的例子中name
的值都为jack
匹配上了一个 就能收到消息
Spring集成
这里的示例是用的QUEUE的方式
注意下面的这行配置
代码语言:javascript复制<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>
一定要配置的是routing-key="myqueue"
不要配成queue="myqueue"
我就是在这里折腾了好久。
下面是具体的配置:
1) 添加依赖
代码语言:javascript复制<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
</dependencies>
2) 在resources
中添加文件rabbitmq.xml
内容如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="mq.psvmc.cn"/>
<property name="username" value="psvmc"/>
<property name="password" value="psvmc"/>
<property name="port" value="5672"/>
<property name="channelCacheSize" value="5"/>
</bean>
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义queue -->
<rabbit:queue name="myqueue" auto-declare="true" durable="false" auto-delete="false" exclusive="false"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>
<!-- 监听生产者发送的消息开始 -->
<!-- 消息接收者 -->
<bean id="messageReceiver" class="cn.psvmc.spring.MessageConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<!-- acknowledeg = "manual":意为表示该消费者的ack方式为手动 ;acknowledge="auto"表示自动-->
<!-- prefetch=1设置预取消息数目为1 -->
<rabbit:listener-container
prefetch="1"
connection-factory="connectionFactory"
auto-declare="true"
acknowledge="manual">
<rabbit:listener queue-names="myqueue" ref="messageReceiver" method="onMessage"/>
</rabbit:listener-container>
<!-- 监听生产者发送的消息结束 -->
</beans>
3) 消息的生产者
代码语言:javascript复制import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageProducer {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:rabbitmq.xml");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
int message = 1;
while (message < 100) {
//发送消息到队列中
template.convertAndSend("" message);
System.out.println(" 发送消息:" message);
Thread.sleep(2000);
message = 1;
}
((ClassPathXmlApplicationContext) context).destroy();
}
}
4) 消息的消费者
代码语言:javascript复制import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
public class MessageConsumer implements ChannelAwareMessageListener {
/**
* 处理收到的rabbit消息的回调方法。
*
* @param message AMQP封装消息对象
* @param channel 信道对象,可以进行确认回复
* @throws Exception Any.
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("- 收到消息:" new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}