1、主流的消息中间件简单介绍哦。
1)、ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,并且它一个完全支持jms(java message service)规范的消息中间件。其丰富的api,多种集群构建模式使得他成为业界老牌消息中间件,在中小企业中应用广泛。 如果不是高并发的系统,对于ActiveMQ,是一个不错的选择的,丰富的api,让你开发的很愉快哟。 注意:MQ衡量指标:服务性能,数据存储,集群架构。
2)、kafka是LinkedIn开源的分布式发布/订阅消息系统,目前归属于Apache顶级项目。kafka主要特点是基于Pull的模式来处理消息消费,最求高吞吐量,一开始的目的就是用于日志收集和传输,0.8版本开始支持复制,不支持事务,对消息的重复,丢失,错误没有严格要求,适量产生大量数据的互联网服务的数据收集业务。
3)、RocketMQ是阿里开源的消息中间件,目前也已经孵化为了Apache顶级项目,它是纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于kafka,它对消息的可靠传输以及事务性做了优化,目前在阿里集团被广泛用于交易,充值,流计算、消息推送、日志流式处理,binglog分发等场景。
4)、RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现的。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅模式)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
2、RabbitMQ的简单介绍。
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据(即RabbitMQ可以实现跨语言、跨平台操作),RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
3、RabbitMQ高性能的原因所在是什么呢? 答:RabbitMQ所使用的开发语言是ErLang语言,ErLang其最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的。Erlang的优点,Erlang有着和原生Socket一样的延迟。性能十分优越。
4、AMQP高级消息队列协议是什么? 答:AMQP全称是Advanced Message Queuing Protocol(高级消息队列协议)。AMQP定义是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
5、AMQP协议模型。
详细介绍如下所示:
1)、Server,又称为Broker,接受客户端的连接,实现AMQP实体服务。 2)、Connection,连接,应用程序与Broker的网络连接。 3)、Channel,网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。 4)、Message,消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Propertie可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体内容。 5)、Virtual Host,虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。 6)、Exchange,交换机,接受消息,根据路由键转发消息到绑定的队列。 7)、Binding,Exchange和Queue之间的虚拟连接,binding中可以包含routing key。 8)、Routing key,一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。 9)、Queue,也称为Message Queue,消息队列,保存消息并将它们转发给消费者。
6、RabbitMQ的架构设计如下所示:
7、RabbitMQ的安装。RabbitMQ的官方网址:https://www.rabbitmq.com/
可以选择自己RabbitMQ的版本,以及对应的Erlang的版本。这里使用rabbitmq-server-3.6.5-1.noarch.rpm一键安装方式进行安装RabbitMQ的方式。一定要注意RabbitMQ的版本和Erlang的版本对应哦。点进去Erlang version可以自己对照版本。
搭建RabbitMQ所需包:
a)、erlang-18.3-1.el7.centos.x86_64.rpm这个是erlang语言基础安装包。
b)、rabbitmq-server-3.6.5-1.noarch.rpm这个是rabbitmq服务端安装包。
c)、socat-1.7.3.2-1.1.el7.x86_64.rpm这个是socat密钥。
可以下载安装包,然后进行安装即可:
代码语言:javascript复制1 wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
2 wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
3 wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
首先安装Erlang的语言基础安装包,安装过程如下所示:
代码语言:javascript复制1 [root@slaver4 package]# ls
2 erlang-18.3-1.el7.centos.x86_64.rpm haproxy-1.6.5.tar.gz keepalived-1.2.18.tar.gz rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq-server-3.6.5-1.noarch.rpm socat-1.7.3.2-1.1.el7.x86_64.rpm
3 [root@slaver4 package]# rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
4 Preparing... ################################# [100%]
5 Updating / installing...
6 1:erlang-18.3-1.el7.centos ################################# [100%]
7 [root@slaver4 package]#
开始安装密钥包,如下所示:
代码语言:javascript复制1 [root@slaver4 package]# rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
2 warning: socat-1.7.3.2-1.1.el7.x86_64.rpm: Header V4 RSA/SHA1 Signature, key ID 87e360b8: NOKEY
3 Preparing... ################################# [100%]
4 Updating / installing...
5 1:socat-1.7.3.2-1.1.el7 ################################# [100%]
6 [root@slaver4 package]#
开始安装rabbitmq服务器端,如下所示:
代码语言:javascript复制1 [root@slaver4 package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
2 warning: rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA1 Signature, key ID 6026dfca: NOKEY
3 Preparing... ################################# [100%]
4 Updating / installing...
5 1:rabbitmq-server-3.6.5-1 ################################# [100%]
6 [root@slaver4 package]#
8、rpm安装方式已经帮助你配置好了环境这些东西,比解压缩安装好点,因为解压缩安装还需要手动配置环境变量的。接下来,配置一下RabbitMQ。配置如下所示:
代码语言:javascript复制 1 [root@slaver4 package]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/
2 [root@slaver4 ebin]# ls
3 background_gc.beam rabbit_epmd_monitor.beam rabbit_plugins_main.beam
4 delegate.beam rabbit_error_logger.beam rabbit_plugins_usage.beam
5 delegate_sup.beam rabbit_error_logger_file_h.beam rabbit_policies.beam
6 dtree.beam rabbit_exchange.beam rabbit_policy.beam
7 file_handle_cache.beam rabbit_exchange_parameters.beam rabbit_prelaunch.beam
8 file_handle_cache_stats.beam rabbit_exchange_type_direct.beam rabbit_prequeue.beam
9 gatherer.beam rabbit_exchange_type_fanout.beam rabbit_priority_queue.beam
10 gm.beam rabbit_exchange_type_headers.beam rabbit_queue_consumers.beam
11 lqueue.beam rabbit_exchange_type_invalid.beam rabbit_queue_index.beam
12 mirrored_supervisor_sups.beam rabbit_exchange_type_topic.beam rabbit_queue_location_client_local.beam
13 mnesia_sync.beam rabbit_file.beam rabbit_queue_location_min_masters.beam
14 mochinum.beam rabbit_framing.beam rabbit_queue_location_random.beam
15 pg2_fixed.beam rabbit_guid.beam rabbit_queue_location_validator.beam
16 pg_local.beam rabbit_hipe.beam rabbit_queue_master_location_misc.beam
17 rabbit_access_control.beam rabbit_limiter.beam rabbit_recovery_terms.beam
18 rabbit_alarm.beam rabbit_log.beam rabbit_registry.beam
19 rabbit_amqqueue_process.beam rabbit_memory_monitor.beam rabbit_resource_monitor_misc.beam
20 rabbit_amqqueue_sup.beam rabbit_mirror_queue_coordinator.beam rabbit_restartable_sup.beam
21 rabbit_amqqueue_sup_sup.beam rabbit_mirror_queue_master.beam rabbit_router.beam
22 rabbit.app rabbit_mirror_queue_misc.beam rabbit_runtime_parameters.beam
23 rabbit_auth_mechanism_amqplain.beam rabbit_mirror_queue_mode_all.beam rabbit_sasl_report_file_h.beam
24 rabbit_auth_mechanism_cr_demo.beam rabbit_mirror_queue_mode.beam rabbit_ssl.beam
25 rabbit_auth_mechanism_plain.beam rabbit_mirror_queue_mode_exactly.beam rabbit_sup.beam
26 rabbit_autoheal.beam rabbit_mirror_queue_mode_nodes.beam rabbit_table.beam
27 rabbit.beam rabbit_mirror_queue_slave.beam rabbit_trace.beam
28 rabbit_binding.beam rabbit_mirror_queue_sync.beam rabbit_upgrade.beam
29 rabbit_boot_steps.beam rabbit_mnesia.beam rabbit_upgrade_functions.beam
30 rabbit_channel_sup.beam rabbit_mnesia_rename.beam rabbit_variable_queue.beam
31 rabbit_channel_sup_sup.beam rabbit_msg_file.beam rabbit_version.beam
32 rabbit_cli.beam rabbit_msg_store.beam rabbit_vhost.beam
33 rabbit_client_sup.beam rabbit_msg_store_ets_index.beam rabbit_vm.beam
34 rabbit_connection_helper_sup.beam rabbit_msg_store_gc.beam supervised_lifecycle.beam
35 rabbit_connection_sup.beam rabbit_node_monitor.beam tcp_listener.beam
36 rabbit_control_main.beam rabbit_parameter_validation.beam tcp_listener_sup.beam
37 rabbit_ctl_usage.beam rabbit_password.beam truncate.beam
38 rabbit_dead_letter.beam rabbit_password_hashing_md5.beam vm_memory_monitor.beam
39 rabbit_diagnostics.beam rabbit_password_hashing_sha256.beam worker_pool.beam
40 rabbit_direct.beam rabbit_password_hashing_sha512.beam worker_pool_sup.beam
41 rabbit_disk_monitor.beam rabbit_plugins.beam worker_pool_worker.beam
42 [root@slaver4 ebin]# vim rabbit.app
修改内容如是:{loopback_users, <<"guest">>},修改为{loopback_users, [guest]}。这个是用户的设置。必须修改的。
9、RabbitMQ安装成功以后,就可以进行RabbitMQ的服务启动和停止。
代码语言:javascript复制 1 [root@slaver4 ~]# rabbitmq-server start &
2 [1] 14092
3 [root@slaver4 ~]#
4 RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
5 ## ## Licensed under the MPL. See http://www.rabbitmq.com/
6 ## ##
7 ########## Logs: /var/log/rabbitmq/rabbit@slaver4.log
8 ###### ## /var/log/rabbitmq/rabbit@slaver4-sasl.log
9 ##########
10 Starting broker...
11 completed with 0 plugins.
12
13 [root@slaver4 ~]#
启动完成以后,如何验证启动是否正常呢,使用如下命令可以查看RabbitMQ启动是否正常。可以看到RabbitMQ的进程号,以及协议名称等等。
代码语言:javascript复制1 [root@slaver4 ~]# lsof -i:5672
2 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
3 beam 14206 rabbitmq 48u IPv6 70172 0t0 TCP *:amqp (LISTEN)
4 [root@slaver4 ~]#
如何停止RabbitMQ呢,可以使用如下所示停止方式,如下所示:
代码语言:javascript复制1 [root@slaver4 ~]# rabbitmqctl stop
2 Stopping and halting node rabbit@slaver4 ...
3 Gracefully halting Erlang VM
可以使用[root@slaver4 ~]# rabbitmq-plugins list命令查看默认提供了什么样的插件。
代码语言:javascript复制 1 [root@slaver4 ~]# rabbitmq-plugins list
2 Configured: E = explicitly enabled; e = implicitly enabled
3 | Status: * = running on rabbit@slaver4
4 |/
5 [ ] amqp_client 3.6.5
6 [ ] cowboy 1.0.3
7 [ ] cowlib 1.0.1
8 [ ] mochiweb 2.13.1
9 [ ] rabbitmq_amqp1_0 3.6.5
10 [ ] rabbitmq_auth_backend_ldap 3.6.5
11 [ ] rabbitmq_auth_mechanism_ssl 3.6.5
12 [ ] rabbitmq_consistent_hash_exchange 3.6.5
13 [ ] rabbitmq_event_exchange 3.6.5
14 [ ] rabbitmq_federation 3.6.5
15 [ ] rabbitmq_federation_management 3.6.5
16 [ ] rabbitmq_jms_topic_exchange 3.6.5
17 [ ] rabbitmq_management 3.6.5
18 [ ] rabbitmq_management_agent 3.6.5
19 [ ] rabbitmq_management_visualiser 3.6.5
20 [ ] rabbitmq_mqtt 3.6.5
21 [ ] rabbitmq_recent_history_exchange 1.2.1
22 [ ] rabbitmq_sharding 0.1.0
23 [ ] rabbitmq_shovel 3.6.5
24 [ ] rabbitmq_shovel_management 3.6.5
25 [ ] rabbitmq_stomp 3.6.5
26 [ ] rabbitmq_top 3.6.5
27 [ ] rabbitmq_tracing 3.6.5
28 [ ] rabbitmq_trust_store 3.6.5
29 [ ] rabbitmq_web_dispatch 3.6.5
30 [ ] rabbitmq_web_stomp 3.6.5
31 [ ] rabbitmq_web_stomp_examples 3.6.5
32 [ ] sockjs 0.3.4
33 [ ] webmachine 1.10.3
那么安装RabbitMQ成功以后,如何安装管理台或者管控台的插件呢,如下所示操作:
代码语言:javascript复制 1 [root@slaver4 ~]# rabbitmq-plugins enable rabbitmq_management
2 The following plugins have been enabled:
3 mochiweb
4 webmachine
5 rabbitmq_web_dispatch
6 amqp_client
7 rabbitmq_management_agent
8 rabbitmq_management
9
10 Applying plugin configuration to rabbit@slaver4... started 6 plugins.
11 [root@slaver4 ~]#
安装好管控台插件以后就可以使用浏览器进行验证(管控台的默认端口号是15672,5672是java端通信的端口号,25672是集群进行通信的端口号),访问地址如是:http://192.168.110.133:15672/。账号和密码默认就是guest哟。
10、命令行和管控台的基本操作。
代码语言:javascript复制 1 常用命令如下所示:
2 # 关闭应用
3 [root@slaver4 ~]# rabbitmqctl stop_app
4 # 启动应用
5 [root@slaver4 ~]# rabbitmqctl start_app
6 # 节点状态,查看集群节点状态是什么样子的
7 [root@slaver4 ~]# rabbitmqctl status
8 # 添加用户
9 [root@slaver4 ~]# rabbitmqctl add_user username password
10 # 列出所有用户
11 [root@slaver4 ~]# rabbitmqctl list_users
12 # 删除用户
13 [root@slaver4 ~]# rabbitmqctl delete_user username
14 # 清除用户权限
15 [root@slaver4 ~]# rabbitmqctl clear_permissions -p vhostpath username
16 # 列出用户权限
17 [root@slaver4 ~]# rabbitmqctl list_user_permissions username
18 # 修改用户密码
19 [root@slaver4 ~]# rabbitmqctl change_password username newpassword
20 # 设置用户权限
21 [root@slaver4 ~]# rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
22
23 RabbitMQ支持对虚拟主机,交换机,队列这些进行操作。常用命令如下所示:
24 # 创建虚拟主机
25 [root@slaver4 ~]# rabbitmqctl add_vhost vhostpath
26 # 列出所有虚拟主机
27 [root@slaver4 ~]# rabbitmqctl list_vhosts
28 # 列出虚拟主机上所有权限
29 [root@slaver4 ~]# rabbitmqctl list_permissions -p vhostpath
30 # 删除虚拟主机
31 [root@slaver4 ~]# rabbitmqctl delete_vhosts vhostpath
32 # 列出所有队列信息
33 [root@slaver4 ~]# rabbitmqctl list_queues
34 # 清除队列里的信息
35 [root@slaver4 ~]# rabbitmqctl -p vhostpath purge_queue blue
36
37 命令行和管控台的高级操作。
38 # 移除所有数据,要在rabbitmqctl stop_app之后使用
39 [root@slaver4 ~]# rabbitmqctl reset
40 # 组成集群命令,ram是加入节点的时候可以指定存储模式。
41 [root@slaver4 ~]# rabbitmqctl join_cluster <clusternode> [--ram]
42 # 查看集群的状态
43 [root@slaver4 ~]# rabbitmqctl cluster_status
44 # 修改集群节点的存储形式
45 [root@slaver4 ~]# rabbitmqctl change_cluster_node_type disc | ram
46 # 忘记节点(摘除节点)
47 [root@slaver4 ~]# rabbitmqctl forget_cluster_node [--offline]
48 # 修改节点名称
49 [root@slaver4 ~]# rabbitmqctl rename_cluster_node oladnode1 newnode1 [oldnode2] [newnode2...]
命令行可以操作的命令,在管控台也可以进行响应的操作,下面是管控台的菜单栏介绍:
11、RabbitMQ的消息生产和消费。生产者Producer发送一条消息,将消息投递到Rabbitmq的集群中即Broker中。消费端进行监听,监听Rabbitmq队列,获取到数据进行消费。 1)、ConnectionFactory,获取连接工厂,需要配置相关信息ip地址、端口号port,虚拟主机vhost。 2)、Connection,通过连接工厂获取到一个连接。 3)、Channel,通过连接创建一个Channel,网络通信信道,可以发送和接收消息。Channel是Rabbitmq所有进行数据交互的关键。 4)、Queue,创建一个队列,具体的消息存储队列。真正的物理的队列,存在于RabbitMQ的Broker上面。进行存储消息的功能。 5)、Producer生产者,生产者生产消息和Consumer消费者,消费者消费消息。
方式一,由于使用的maven构建的springboot2.x版本的项目,引入的依赖包如下所示:
代码语言:javascript复制 1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5 https://maven.apache.org/xsd/maven-4.0.0.xsd">
6 <modelVersion>4.0.0</modelVersion>
7 <parent>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-parent</artifactId>
10 <version>2.1.1.RELEASE</version>
11 <relativePath /> <!-- lookup parent from repository -->
12 </parent>
13 <groupId>com.bie</groupId>
14 <artifactId>rabbitmq</artifactId>
15 <version>0.0.1-SNAPSHOT</version>
16 <name>rabbitmq</name>
17 <description>Demo project for Spring Boot</description>
18
19 <properties>
20 <java.version>1.8</java.version>
21 </properties>
22
23 <dependencies>
24 <dependency>
25 <groupId>org.springframework.boot</groupId>
26 <artifactId>spring-boot-starter</artifactId>
27 </dependency>
28 <dependency>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter-web</artifactId>
31 </dependency>
32 <dependency>
33 <groupId>org.springframework.boot</groupId>
34 <artifactId>spring-boot-starter-test</artifactId>
35 <scope>test</scope>
36 </dependency>
37 <dependency>
38 <groupId>org.springframework.boot</groupId>
39 <artifactId>spring-boot-starter-amqp</artifactId>
40 </dependency>
41 </dependencies>
42
43 <build>
44 <plugins>
45 <plugin>
46 <groupId>org.springframework.boot</groupId>
47 <artifactId>spring-boot-maven-plugin</artifactId>
48 </plugin>
49 </plugins>
50 </build>
51
52 </project>
配置application.properties的配置文件,将rabbitmq所在的服务器地址,端口号,账号,密码,以及队列的名称。
代码语言:javascript复制 1 # 给当前项目起名称.
2 spring.application.name=rabbitmq
3
4 # 配置rabbitmq的参数.
5 # rabbitmq服务器的ip地址.
6 spring.rabbitmq.host=192.168.110.133
7 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
8 spring.rabbitmq.port=5672
9 # rabbitmq的账号.
10 spring.rabbitmq.username=guest
11 # rabbitmq的密码.
12 spring.rabbitmq.password=guest
13
14 # 队列的名称
15 rabbitmq.queue=queue001
首先创建一个队列,在项目启动的时候,就进行加载,方便生产者生产的消息保存到队列里面。
代码语言:javascript复制 1 package com.example.bie.config;
2
3 import org.springframework.amqp.core.Queue;
4 import org.springframework.beans.factory.annotation.Value;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7
8 /**
9 *
10 * @author biehl
11 *
12 * @Configuration项目启动加载本类
13 *
14 */
15 @Configuration
16 public class RabbitMqQueueConfig {
17
18 @Value("${rabbitmq.queue}")
19 private String queueName;
20
21 /**
22 * 创建一个队列
23 *
24 * @return
25 */
26 @Bean
27 public Queue createQueue() {
28 return new Queue(this.queueName);
29 }
30
31 }
然后,创建好生产者和消费者以后,可以使用web项目的请求,创建一个控制类,来发送消息,触发生产者生产消息,触发消费者消费消息。
代码语言:javascript复制 1 package com.example.bie.controller;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6 import org.springframework.web.bind.annotation.ResponseBody;
7
8 import com.example.bie.rabbitmq.producer.RabbitmqProducer;
9
10 /**
11 *
12 * @author biehl
13 *
14 */
15 @Controller
16 public class RabbitmqController {
17
18 @Autowired
19 private RabbitmqProducer rabbitmqProducer;
20
21 @RequestMapping(value = "/sendMessage")
22 @ResponseBody
23 public void rabbitmqSendMessage() {
24 String msg = "消息产===>生者<===消息message: ";
25 for (int i = 0; i < 10000; i ) {
26 rabbitmqProducer.producer(msg i);
27 }
28 }
29
30 }
生产者生产消息的,实现类,如下所示:
代码语言:javascript复制 1 package com.example.bie.rabbitmq.producer;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.stereotype.Component;
7
8 /**
9 *
10 * @author biehl
11 *
12 * RabbitmqProducer消息发送者
13 *
14 * @Component加入到容器中.
15 *
16 */
17 @Component
18 public class RabbitmqProducer {
19
20 @Autowired
21 private AmqpTemplate rabbitmqAmqpTemplate;
22
23 @Value("${rabbitmq.queue}")
24 private String queueName;
25
26 /**
27 * 发送消息的方法
28 */
29 public void producer(String msg) {
30 // 向消息队列中发送消息
31 // 参数1,队列的名称
32 // 参数2,发送的消息
33 this.rabbitmqAmqpTemplate.convertAndSend(this.queueName, msg);
34 }
35
36 }
消费者消费消息的实现类,如下所示:
代码语言:javascript复制 1 package com.example.bie.rabbitmq.consumer;
2
3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
4 import org.springframework.beans.factory.annotation.Value;
5 import org.springframework.stereotype.Component;
6
7 /**
8 *
9 * @author biehl
10 *
11 * RabbitmqConsumer消息消费者
12 *
13 * 消费者是根据消息队列的监听器,进行消息的接收和消费。
14 *
15 * 消息队列发生变化,消息事件就会产生,触发方法进行消息的接收。
16 *
17 */
18 @Component
19 public class RabbitmqConsumer {
20
21 @Value("${rabbitmq.queue}")
22 private String queueName;
23
24 /**
25 * 消费者消费消息,接受消息的方法,采用消息队列监听机制.
26 *
27 * @RabbitListener
28 *
29 * 意思是当队列发生变化,消息事件产生了或者生产者发送消息了。
30 *
31 * 马上就会触发这个方法,进行消息的消费。
32 */
33 @RabbitListener(queues = "queue001")
34 public void consumer(String msg) {
35 // 打印消息
36 System.out.println("消费者===>消费<===消息message: " msg);
37 }
38
39 }
springboot2.x版本的主启动类,如下所示:
代码语言:javascript复制 1 package com.example;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class RabbitmqApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(RabbitmqApplication.class, args);
11 }
12
13 }
效果如下所示:
方式二,或者使用下面这种方式,直接进行生产者生产消息和消费者消费消息的测试,生产者生产消息的代码如下所示:
代码语言:javascript复制 1 package com.example.bie;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import com.rabbitmq.client.AMQP.BasicProperties;
7 import com.rabbitmq.client.Channel;
8 import com.rabbitmq.client.Connection;
9 import com.rabbitmq.client.ConnectionFactory;
10
11 /**
12 *
13 * @author biehl
14 *
15 * ConnectionFactory,获取连接工厂。
16 *
17 * Connection,一个连接。
18 *
19 * Channel,数据通信信道,可以发送和接受消息。
20 *
21 * Queue,具体的消息存储队列。
22 *
23 * Producer和Consumer,生产和消费者。
24 */
25 public class RabbitMqProducer {
26
27 public static void main(String[] args) {
28 try {
29 // 1、创建一个ConnectionFactory
30 ConnectionFactory connectionFactory = new ConnectionFactory();
31 // 配置服务器ip地址,端口号,虚拟主机
32 connectionFactory.setHost("192.168.110.133");
33 connectionFactory.setPort(5672);
34 connectionFactory.setVirtualHost("/");
35
36 // 2、创建连接工厂创建连接
37 Connection connection = connectionFactory.newConnection();
38
39 // 3、通过connection创建一个Channel
40 Channel channel = connection.createChannel();
41
42 // 4、通过Channel发送数据。消息组成部分就是props(即消息的附加属性)和body(消息实体)。
43 // 生产者发送消息,只需要指定exchange和routingKey。
44 String exchange = "";// 数据通信信道,交换机,接受消息,根据路由键转发消息到绑定的队列。
45 // 一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
46 String routingKey = "queue001";
47 BasicProperties props = null;// 消息的附加属性
48 // 循环发送消息
49 System.out.println("开始生产消息......");
50 for (int i = 0; i < 100; i ) {
51 // 消息实体
52 // String msg = "Hello RabbitMQ!";
53 byte[] body = (String.valueOf(i) " hello RabbitMQ!!!").getBytes();
54 // 如果exchange是空的话,会使用AMQP default这个Exchange。
55 // 然后会根据routingKey的名称去队列里面找到名称对应的,然后将消息路由过去。
56 channel.basicPublish(exchange, routingKey, props, body);
57 }
58
59 // 5、关闭连接,原则,由小到大进行关闭
60 channel.close();
61 connection.close();
62 } catch (IOException e) {
63 e.printStackTrace();
64 } catch (TimeoutException e) {
65 e.printStackTrace();
66 }
67 }
68
69 }
消费者消费消息的代码如下所示:
代码语言:javascript复制 1 package com.example.bie;
2
3 import java.io.IOException;
4 import java.util.HashMap;
5 import java.util.Map;
6 import java.util.concurrent.TimeoutException;
7
8 import com.rabbitmq.client.AMQP.Queue.DeclareOk;
9 import com.rabbitmq.client.Channel;
10 import com.rabbitmq.client.Connection;
11 import com.rabbitmq.client.ConnectionFactory;
12 import com.rabbitmq.client.Consumer;
13 import com.rabbitmq.client.ConsumerCancelledException;
14 import com.rabbitmq.client.QueueingConsumer;
15 import com.rabbitmq.client.QueueingConsumer.Delivery;
16 import com.rabbitmq.client.ShutdownSignalException;
17
18 /**
19 *
20 * @author biehl
21 *
22 */
23 public class RabbitMqConsumer {
24
25 public static void main(String[] args) {
26 try {
27 // 1、创建一个ConnectionFactory
28 ConnectionFactory connectionFactory = new ConnectionFactory();
29 // 配置服务器ip地址,端口号,虚拟主机
30 connectionFactory.setHost("192.168.110.133");
31 connectionFactory.setPort(5672);
32 connectionFactory.setVirtualHost("/");
33
34 // 2、创建连接工厂创建连接
35 Connection connection = connectionFactory.newConnection();
36
37 // 3、通过connection创建一个Channel
38 Channel channel = connection.createChannel();
39
40 // 4、创建(声明)一个队列
41 String queue = "queue001";// 队列
42 boolean durable = true;// 是否持久化,true是持久化,false是不持久化
43 // 独占的方式,只有一个channel可以去监听,其他channel不能进行监听。保证了顺序消费。
44 boolean exclusive = false;
45 boolean autoDelete = false;// 队列没有和Exchange绑定,就进行自动删除
46 // 扩展参数
47 Map<String, Object> arguments = new HashMap<String, Object>();
48 DeclareOk declareOk = channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
49 System.out.println("consumerCount: " declareOk.getConsumerCount());
50
51 // 5、创建消费者,指定参数,消费者建立在那个channel连接之上
52 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
53
54 // 6、对channel进行设置。queue是设置要消费的队列名称。
55 boolean autoAck = true;// 是否自动签收。
56 Consumer callback = queueingConsumer;//
57 channel.basicConsume(queue, autoAck, callback);
58
59 // 7、获取消息
60 // 消费者创建起来了,消费者监听的队列创建起来了。接下来就获取消息。
61 // delivery是消息封装的对象
62 System.out.println("等待消费......");
63 while (true) {
64 // 获取消息
65 Delivery delivery = queueingConsumer.nextDelivery();
66 String body = new String(delivery.getBody());
67 System.out.println("消费端body: " body);
68 System.out.println("envelope" delivery.getEnvelope().toString());
69 }
70
71 // 8、关闭连接,原则,由小到大进行关闭
72 // channel.close();
73 // connection.close();
74 } catch (IOException e) {
75 e.printStackTrace();
76 } catch (TimeoutException e) {
77 e.printStackTrace();
78 } catch (ShutdownSignalException e) {
79 e.printStackTrace();
80 } catch (ConsumerCancelledException e) {
81 e.printStackTrace();
82 } catch (InterruptedException e) {
83 e.printStackTrace();
84 }
85 }
86
87 }
实现的效果,除了控制台的输出,你也可以在管控台里面查看对应的效果,如连接Connection的个数、Channel的个数、Exchange的个数、Queue的个数、Consumer的个数、以及主页折线图展示的最新消息个数、消费速率等等信息。观察这些变化以达到监控的目的。
12、RabbitMQ的Exchange交换机。Exchange接受消息(即生产者生产的消息,将消息投递到交换机Exchange上面),并且根据路由键转发消息所绑定的队列。
RabbitMQ架构图,概述,如下所示:
1)、蓝色的框,主要表示,生产者客户端将消息投递(Send Message)到交换机Exchange上面,通过路由关系,将生产者生产的消息路由到指定的队列里面。 2)、绿色的框,主要表示,消费者客户端监听队列里面的消息(Receive Message),进行消费。消费者客户端和队列建立了监听,然后接收消息。 3)、红色的虚线框,主要表示,RabbitMQ Server服务器。 4)、黄色的框,主要表示,路由键Routing key,一个绑定的关系,即交换机Exchange和队列Queue建立一个绑定Binding。交换机Exchange上面的消息到达那个队列Queue的规则主要是由路由键Routing key来指定的。
13、交换机的属性,如下所示:
1)、Name:交换机的名称,可以自己指定交换机的名称。 2)、Type:交换机的类型direct,topic,fanout,headers。 a)、Direct Exchange(即直连交换机,路由键Routing key必须一致性),所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。路由规则:Direct Exchange直连交换机,Routing key的名称必须完全匹配(即生产者生产消息携带的路由键和将交换机和队列绑定的路由键必须一致),就会将交换机Exchange上面的消息发送到(路由到)队列Queue上面。 注意:Direct模式可以使用RabbitMQ自带的Exchange,即default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
b)、Topic Exchange(即话题交换机,路由键Routing key规则匹配或者成为模糊匹配),所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上。Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic。注意:可以使用通配符进行模糊匹配。符号,"#"匹配一个或者多个词,符号"*"匹配不多不少一个词(即*号仅仅可以匹配一个词)。路由规则:生产者生产的消息携带的路由键Routing key,如果交换机与队列Queue绑定的路由键,和生产者生产消息携带的路由键规则匹配上,就可以将交换机上面的消息发送到该队列上。
c)、Fanout Exchange(即广播交换机,没有路由键Routing key的概念),不处理路由键,只需要简单的将队列绑定到交换机上面。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上面(即,一个或者多个队列绑定交换机,那么交换机会将消息转发到一个或者多个队列上面)。Fanout交换机转发消息是最快的(性能最好,因为广播交换机,不做匹配,没有路由规则)。
d)、Headers Exchange,根据消息头进行路由,不是很常用。
3)、Durability:是否需要持久化,true为持久化,false表示非持久化。 4)、Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。值为true表示自动删除,值为false表示不进行自动删除。 5)、Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false。基本不使用该属性。 6)、Arguments:扩展参数,用户扩展AMQP协议自制定化使用。
14、RabbitMQ的绑定Binding。
答:Binding绑定,是Exchange和Exchange、Queue之间的连接关系。即交换机和交换机可以绑定,交换机和队列可以进行绑定。Binding中可以包含Routing key或者参数。
15、RabbitMQ的消息队列Queue。
答:消息队列Queue,实际存储消息数据,在实际的物理磁盘中有一块空间创建队列。包含的属性有,Durability是否持久化,Durable是持久化,Transient是不进行持久化。Auto delete,如果选择yes代表当最后一个监听被移除之后,该Queue会自动被删除。
16、RabbitMQ的消息Message。
答:消息Message,服务器和应用程序之间传送的数据。消息本质就是一段数据,由Properties和Payload(即Body)组成。常用属性,delivery mode(消息到Broker上,可以做持久化,也可以做内存级别的非持久化),headers(自定义属性)。content_type,content_encoding(字符集),priority(优先级)。 correlation_id(唯一id),reply_to(消息失败了返回到那个队列),expiration(消息的过期时间),message_id(消息的id)。timestamp,type,user_id,app_id,cluster_id。
17、RabbitMQ的虚拟主机Virtual host。
答:虚拟主机Virtual host,用于进行逻辑隔离,最上层的消息路由。虚拟主机不是物理的概念。一个Virtual Host里面可以有若个干Exchange和Queue。同一个Virtual Host里面不能有相同名称的Exchange或者Queue。