消息中间件
RabbitMq ActiveMQ RocketMQ kafka
安装RabbitMQ
(33条消息) RabbitMQ安装教程(超详细)_凡尘-追梦者的博客-CSDN博客
为什么要使用消息队列
为什么使用消息队列? (gitee.io)
解耦 异步 削峰
缺点
系统可用性降低 系统复杂度提高 一致性问题
RabbitMQ demo
1、rabbitmq创建vhost
在 RabbitMQ 中,可以通过管理界面或者命令行方式创建 vhost(虚拟主机)。下面分别介绍两种方式:
通过 RabbitMQ 管理界面创建 vhost:
- 打开浏览器,访问 RabbitMQ 管理界面(默认地址:http://localhost:15672/)。
- 使用管理员账号登录(默认用户名:guest,密码:guest)。
- 在左侧导航栏中选择 “Admin”(管理员菜单)。
- 在 “Add a new virtual host”(添加新的虚拟主机)部分,输入要创建的 vhost 名称,然后点击 “Add virtual host” 按钮即可。
通过命令行方式创建 vhost:
使用 RabbitMQ 的 rabbitmqctl
命令可以进行管理操作。在命令行中运行以下命令来创建 vhost:
bashCopy code# 以管理员权限登录 RabbitMQ
sudo rabbitmqctl add_vhost <vhost_name>
注意将 <vhost_name>
替换为你要创建的 vhost 名称。
无论哪种方式创建 vhost,都可以在 RabbitMQ 中为不同应用程序或服务创建隔离的消息队列环境。确保为每个应用程序分配不同的 vhost,以便更好地管理和维护消息队列。
2、生产者和消费者demo
在参数设置的时候虚拟机是不需要”/“
ProducerHelloworld.java
代码语言:javascript复制public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建一个连接工厂
ConnectionFactory factory=new ConnectionFactory();
// 2、设置参数
factory.setHost("101.200.40.242");//主机ip
factory.setPort(5672);//端口号
factory.setVirtualHost("lyz");//设置虚拟机(可以理解为分组)
factory.setUsername("root");
factory.setPassword("root");
//3、根据配置信息 创建连接
Connection connection = factory.newConnection();
//4、创建channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数:
* 1. queue:队列名称
* 2. durable:是否持久化,true 当mq重启之后,还在
* 3. exclusive:
* * 是否独占。只能有一个消费者监听这队列
* * 当Connection关闭时,是否删除队列
* *
* 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("queueHelloworld",true,false,false,null);
// 6、发送消息
String body="hello 07201111";
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish("","queueHelloworld",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
ConsumerHelloworld.java
代码语言:javascript复制public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setHost("101.200.40.242");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
factory.setVirtualHost("lyz");
// 创建连接 Connection
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("queueHelloworld",true,false,false,null);
//接受消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("consumerTag = " consumerTag);
System.out.println("envelope.getExchange() = " envelope.getExchange());
System.out.println("envelope.getRoutingKey() = " envelope.getRoutingKey());
System.out.println("properties = " properties);
System.out.println("new String(body) = " new String(body));
}
};
//参数3 使用回调对象的对调函数 展示消息
channel.basicConsume("queueHelloworld",true,consumer);
}
Springboot整合RabbitMQ
简单测试
代码语言:javascript复制配置类
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME="boot_topic_exchange";
public static final String QUEUE_NAME="boot_queue";
@Bean
public Queue queue(){
return new Queue("queue",true);
}
}
代码语言:javascript复制发送请求
@Service
@Slf4j
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message){
log.info("发送消息:" message);
rabbitTemplate.convertAndSend("queue",message);
}
}
代码语言:javascript复制接收消息
@Service
@Slf4j
public class MQReceiver {
@RabbitListener(queues = "queue")
public void receive(Object msg){
log.info("接受消息:" msg);
}
}
fanout广播模式
代码语言:javascript复制这个模式将消息广播给所有与交换机绑定的队列。路由键在这里不起作用,只需要绑定队列到交换机即可,所有绑定的队列都会收到相同的消息。
配置类
@Configuration
public class RabbitMQConfig {
private static final String QUEUE1="queue_fanout01";
private static final String QUEUE2="queue_fanout02";
private static final String EXCHANGE="fanoutExchange";
@Bean
public Queue queue(){
return new Queue("queue",true);
}
@Bean
public Queue queue1(){
return new Queue(QUEUE1);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE2);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE);
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
}
代码语言:javascript复制发送信息
@Service
@Slf4j
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message){
log.info("发送消息:" message);
rabbitTemplate.convertAndSend("fanoutExchange","",message);
}
}
代码语言:javascript复制接受消息
@Service
@Slf4j
public class MQReceiver {
@RabbitListener(queues = "queue")
public void receive(Object msg){
log.info("接受消息:" msg);
}
@RabbitListener(queues = "queue_fanout01")
public void receive01(Object msg){
log.info("QUEUE01_接受消息:" msg);
}
@RabbitListener(queues = "queue_fanout02")
public void receive02(Object msg){
log.info("QUEUE_02接受消息:" msg);
}
}
测试
direct模式
代码语言:javascript复制路由模式 生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。 也就是让消费者有选择性的接收消息。 路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。
配置类
@Configuration
public class RabbitMQConfig {
private static final String QUEUE1="queue_direct01";
private static final String QUEUE2="queue_direct02";
private static final String EXCHANGE="directExchange";
private static final String ROUNTINGKEY01="queue.red";
private static final String ROUNTINGKEY02="queue.green";
@Bean
public Queue queue1(){
return new Queue(QUEUE1);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE2);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE);
}
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUNTINGKEY01);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUNTINGKEY02);
}
}
代码语言:javascript复制消息发送者
@Service
@Slf4j
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Object message){
log.info("发送消息:" message);
rabbitTemplate.convertAndSend("fanoutExchange","",message);
}
public void send01(Object message){
log.info("发送消息:" message);
rabbitTemplate.convertAndSend("directExchange","queue.red",message);
}
public void send02(Object message){
log.info("发送消息:" message);
rabbitTemplate.convertAndSend("directExchange","queue.green",message);
}
}
代码语言:javascript复制消息接收者
@Service
@Slf4j
public class MQReceiver {
@RabbitListener(queues = "queue_direct01")
public void receive01(Object msg){
log.info("QUEUE01_接受消息:" msg);
}
@RabbitListener(queues = "queue_direct02")
public void receive02(Object msg){
log.info("QUEUE_02接受消息:" msg);
}
}
测试
Topic模式
代码语言:javascript复制上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。 符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。 与路由模式相似,但是,主题模式是一种模糊的匹配方式
配置类
@Configuration
public class RabbitMQConfig {
private static final String QUEUE01="queue_topic01";
private static final String QUEUE02="queue_topic02";
private static final String EXCHANGE="topicExchange";
private static final String ROUNTINGKEY01="#.queue.#";
private static final String ROUNTINGKEY02="*.queue.#";
@Bean
public Queue queue1(){
return new Queue(QUEUE01);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE02);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(EXCHANGE);
}
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with(ROUNTINGKEY01);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue2()).to(topicExchange()).with(ROUNTINGKEY02);
}
}
代码语言:javascript复制生产者
@Service
@Slf4j
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send03(Object msg){
log.info("发送消息queue01接受:" msg);
rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
}
public void send04(Object msg){
log.info("发送消息queue01和queue02接受:" msg);
rabbitTemplate.convertAndSend("topicExchange","message.queue.green",msg);
}
}
代码语言:javascript复制消费者
@Service
@Slf4j
public class MQReceiver {
@RabbitListener(queues="queue_topic01")
public void receive05(Object msg){
log.info("queue01接受消息" msg);
}
@RabbitListener(queues="queue_topic02")
public void receive06(Object msg){
log.info("queue02接受消息" msg);
}
}
测试 @Test public void testTopic(){ mqSender.send03("发消息喽03"); } @Test public void testTopic02(){ mqSender.send04("发消息喽04"); }
测试2 @Test public void testTopic(){ mqSender.send03("发消息喽03"); mqSender.send04("发消息喽04"); }
发现再不同时发送的时候,两个接受队列都可以接收到消息04;
但是为什么在同一个方法中,两个接受队列不能同时接受消息04;