MQ 系列之 ActiveMQ 消息持久化机制

2020-12-14 11:24:20 浏览数 (1)

1.1 简介

1.1.1 概述

  为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ 的消息持久化机制有 JDBC,AMQ,KahaDB 和 LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

1.1.2 配置文件

1.2 持久化方式

1.2.1 AMQ【了解】

☞ 概述

  AMQ 是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为 32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ 适用于 ActiveMQ 5.3 之前的版本,主要的缺点是 AMQ Message 会为每一个 Destination 创建一个索引,如果使用了大量的 Queue,索引文件的大小会占用很多磁盘空间。而且由于索引巨大,一旦 Broker 崩溃,重建索引的速度会非常慢。

☞ 配置
代码语言:javascript复制
<persistenceAdapter>
     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

1.2.2 KahaDB【默认】

☞ 概述

  KahaDB 是从 ActiveMQ 5.4 开始默认的持久化存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。KahaDB 是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。数据被追加到 data logs 中。当不再需要 log 文件中的数据的时候,log 文件会被丢弃。

☞ 存储原理

Kahadb 在消息保存目录中只有 4类文件和一个 lock,跟 ActiveMQ 的其他几种文件存储引擎相比这就非常简洁了。  ♞ db-<Number>.log:KahaDB 存储消息到预定义大小的数据记录文件中,文件命名为 db-xxx.log。当数据文件已满时,一个新的文件会随之创建,number 数值也会随之递增,它随着消息数量的增多,如每 32M 一个文件,文件名按照数字进行编号,如 db-1.log、db-2.log、db-3.log ···。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。  ♞ db.data:该文件包含了持久化的 B-Tree 索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是 B-Tree(B树),使用 B-Tree 作为索引指向 db-xxx.log 里面存储的消息。  ♞ db.free:当前 db.data 文件里哪些页面是空闲的,文件具体内容是所有空闲页的 ID  ♞ db.redo:用来进行消息恢复,如果 KahaDB 消息存储在强制退出后启动,用于恢复 B-Tree 索引。  ♞ lock:文件锁,表示当前获得 KahaDB 读写权限的 broker。

☞ 配置文件
代码语言:javascript复制
<persistenceAdapter>
	<!-- directory: 指定持久化消息的存储目录; journalMaxFileLength: 指定保存消息的日志文件大小 -->
    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>

1.2.3 LevelDB【了解】

☞ 概述

  这种文件系统是从 ActiveMQ 5.8 之后引进的,它和 KahaDB 非常相似,也是基于文件的本地数据库储存形式,但是它提供比 KahaDB 更快的持久性。但它不使用自定义 B-Tree 实现来索引预写日志,而是使用基于 LevelDB 的索引。目前默认的持久化方式仍然是 KahaDB,不过 LevelDB 持久化性能高于 KahaDB,可能是以后的趋势。

☞ 配置文件
代码语言:javascript复制
<persistenceAdapter>
	<levelDB directory="activemq-data"/>
</persistenceAdapter>

1.2.4 JDBC

☞ 配置文件
代码语言:javascript复制
<broker ···>
	···

	<persistenceAdapter> 
		<!--
			dataSource 指定将要引用的持久化数据库的 bean 名称,很显然我们还要添加一个 bean 配置数据源
			createTablesOnStartup 默认 true,MQ 启动的时候都重新创建数据表,一般首次设置为 true,之后设置为 false
		-->
		<jdbcPersistenceAdapter dataSource="#my-ds" createTablesOnStartup="false" /> 
	</persistenceAdapter>

	···
</broker>

<!-- 注意这个需要在 </broker> 之后,<import resource="jetty.xml"/> 之前配置 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  <property name="username" value="activemq"/>
  <property name="password" value="activemq"/>
  <property name="poolPreparedStatements" value="true"/>
</bean>
☞ 添加 jar

  从配置文件可以看出我们使用了 MySQL,所以我们需要在 /lib 目录中添加 MySQL 驱动包,ActiveMQ 默认的数据库连接池是 dbcp,如果要更改也是需要将数据库连接池的 jar 包添加到库中。

☞ 数据库

  新建一个名为 activemq 的数据库,如果上述配置没有问题,启动时会自动创建三张表,其中 activemq_msgs 用于存储消息,Queue和Topic都存储在这个表中;activemq_acks 用于存储订阅关系。如果是持久化 Topic,订阅者和服务器的订阅关系在这个表保存;activemq_lock 在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用于记录哪个 Broker 是当前的 Master Broker。

☞ 编码及数据库情况

  注意编码时一定要加上 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 开启持久化,才会保存到数据库中,否则保存到内存中。Queue 模式会将每一条消息保存到数据库 activemq_msgs 表中,等消息被消费者签收后会删除消息。Topic 模式中先启动消费者订阅,在启动生产者,可以在数据库 activemq_acks 表中看到订阅者,该模式的消息依旧会被保存到数据库 activemq_msgs 表中,但是消息被订阅者签收后不会从数据库中删除。

☞ 可能存在的问题

在配置关系型数据库作为 ActiveMQ 的持久化存储方案时,可能会出现以下问题  ♞ 需要使用到的相关 jar 文件放置到 ActiveMQ 安装路径下的 lib 目录,否则会找不到 jar。  ♞ 启动时报 java.lang.llegalStateException:BeanFactory not initialized or already closed 这是因为操作系统的机器名中有 _,修改机器名并且重启后即可解决问题。  ♞ ActiveMQ 第一次启动完成后需要去掉 jdbcPersistenceAdapter 标签中 createTablesOnStartup 这个属性,或者更改为 false,否则会重新创建相关表。

1.2.5 JDBC With Journal

☞ 概述

  JDBC With Journal 克服了 JDBC Store 的不足,JDBC 每次消息过来,都需要去写库和读库。ActiveMQ Journal 使用高速缓存写入技术,大大提高了性能。当消费者的消费速度能够及时跟上生产者消息的生产速度时,Journal 文件能够大大减少需要写入到 DB 中的消息。使用 JDBC With Journal 后,发送出来的消息会在内存中告诉缓存,接收端若在没有接收情况下 7~10 分钟后再写入数据库,这样接收端就不用等到数据库操作完了之后再接收消息。举个例子,生产者生产了 1000 条消息,这 1000 条消息会保存到 Journal 文件,如果消费者的消费速度很快的情况下,在 Journal 文件还没有同步到 DB 之前,消费者已经消费了 90% 的以上的消息,那么这个时候只需要同步剩余的 10% 的消息到 DB。如果消费者的消费速度很慢,这个时候 Journal 文件可以使消息以批量方式写到 DB。

☞ 配置文件
代码语言:javascript复制
<persistenceFactory>
	<journalPersistenceAdapterFactory
	        journalLogFiles="4"
	        journalLogFileSize="32768"
	        useJournal="true"
	        useQuickJournal="true"
	        dataSource="#mysql-ds"
	        dataDirectory="activemq-data"/>
</persistenceFactory>

<!-- 省略 mysql-ds bean -->

0 人点赞