持久化消费和非持久化消费的发送策略
消息同步发送和异步发送 ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。 同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机 制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能 异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所 以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。 默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。 但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消 息的时候,尽量去开启事务会话。 除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送
代码语言:javascript复制1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?
jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
消息的发送原理分析图解
消息发送的流程图
ProducerWindowSize的含义
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的 确认,才能继续发送。 代码在:ActiveMQSession的1957行 主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消 息之后,都将会导致memoryUsage大小增加( message.size),当broker返回producerAck时,memoryUsage尺 寸减少(producerAck.size,此size表示先前发送消息的大小)。 可以通过如下2种方式设置:
- 在brokerUrl中设置: “tcp://localhost:61616?jms.producerWindowSize=1048576”,这种设置将会对所有的producer生效。
- 在destinationUri中设置: “test-queue?producer.windowSize=1048576”,此参数只会对使用此Destination实例的producer失效,将会覆盖brokerUrl中的producerWindowSize值。 注意:此值越大,意味着消耗Client端的内存就越大
消息发送的源码分析
以producer.send为入口
ActiveMQMessageProducer send()方法
代码语言:javascript复制public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
//检查session的状态,如果session以关闭则抛异常
this.checkClosed();
if (destination == null) {
if (this.info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
} else {
throw new InvalidDestinationException("Don't understand null destinations");
}
} else {
ActiveMQDestination dest;
检查destination的类型,如果符合要求,就转变为ActiveMQDestination
if (destination.equals(this.info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else {
if (this.info.getDestination() != null) {
throw new UnsupportedOperationException("This producer can only send messages to: " this.info.getDestination().getPhysicalName());
}
dest = ActiveMQDestination.transform(destination);
}
if (dest == null) {
throw new JMSException("No destination specified");
} else {
if (this.transformer != null) {
Message transformedMessage = this.transformer.producerTransform(this.session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (this.producerWindow != null) {
//如果发送窗口大小不为空,则判断发送窗口的大小决定是否阻塞
try {
this.producerWindow.waitForSpace();
} catch (InterruptedException var10) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//发送消息到broker的topic 这里=======================
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
this.stats.onMessage();
}
}
}
ActiveMQSession send方法
代码语言:javascript复制protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
this.checkClosed();
if (destination.isTemporary() && this.connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " destination);
} else {
//互斥锁,如果一个session的多个producer发送消息到这里,会保证消息发送的有序性
synchronized(this.sendMutex) {
//告诉broker开始一个新事务,只有事务型会话中才会开启
this.doStartTransaction();
//从事务上下文中获取事务id
TransactionId txid = this.transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//在JMS协议头中设置是否持久化标识
message.setJMSDeliveryMode(deliveryMode);
//计算消息过期时间
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0L) {
expiration = timeToLive timeStamp;
}
}
//设置消息过期时间
message.setJMSExpiration(expiration);
//设置消息的优先级
message.setJMSPriority(priority);
//设置消息为非重发
message.setJMSRedelivered(false);
//将不通的消息格式统一转化为ActiveMQMessage
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, this.connection);
//设置目的地
msg.setDestination(destination);
//生成并设置消息id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
//如果消息是经过转化的,则更新原来的消息id和目的地
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
message.setJMSDestination(destination);
}
msg.setBrokerPath((BrokerId[])null);
msg.setTransactionId(txid);
if (this.connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(this.connection);
//把消息属性和消息体都设置为只读,防止被修改
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(this.getSessionId() " sending message: " msg);
}
//如果onComplete没有设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且消息非持久化或者连接器是异步发送模式
//或者存在事务id的情况下,走异步发送,否则走同步发送
if (onComplete != null || sendTimeout > 0 || msg.isResponseRequired() || this.connection.isAlwaysSyncSend() || msg.isPersistent() && !this.connection.isUseAsyncSend() && txid == null) {
if (sendTimeout > 0 && onComplete == null) {
//带超时时间的同步发送
this.connection.syncSendPacket(msg, sendTimeout);
} else {
//带回调的同步发送
this.connection.syncSendPacket(msg, onComplete);
}
} else {
//这里=================
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
//异步发送的情况下,需要设置producerWindow的大小
int size = msg.getSize();
producerWindow.increaseUsage((long)size);
}
}
}
}
}
ActiveMQConnection. doAsyncSendPacket
代码语言:javascript复制private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException var3) {
throw JMSExceptionSupport.create(var3);
}
}
connection = connectionFactory.createConnection();
ActiveMQConnectionFactory. createActiveMQConnection
代码语言:javascript复制protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (this.brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
} else {
ActiveMQConnection connection = null;
try {
//创建transport 这里===============
Transport transport = this.createTransport();
connection = this.createActiveMQConnection(transport, this.factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
this.configureConnection(connection);
transport.start();
if (this.clientID != null) {
connection.setDefaultClientID(this.clientID);
}
return connection;
} catch (JMSException var8) {
try {
connection.close();
} catch (Throwable var7) {
}
throw var8;
} catch (Exception var9) {
try {
connection.close();
} catch (Throwable var6) {
}
throw JMSExceptionSupport.create("Could not connect to broker URL: " this.brokerURL ". Reason: " var9, var9);
}
}
}
createTransport
调用ActiveMQConnectionFactory.createTransport方法,去创建一个transport对象。
- 构建一个URI
- 根据URL去创建一个连接TransportFactory.connect
默认使用的是tcp的协议
代码语言:javascript复制protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = this.brokerURL;
String scheme = this.brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" this.brokerURL "]");
} else {
if (scheme.equals("auto")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto ssl")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto ssl", "ssl"));
} else if (scheme.equals("auto nio")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto nio", "nio"));
} else if (scheme.equals("auto nio ssl")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto nio ssl", "nio ssl"));
}
//这里=========================
return TransportFactory.connect(connectBrokerUL);
}
} catch (Exception var3) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " var3, var3);
}
}
TransportFactory connect
代码语言:javascript复制public static Transport connect(URI location) throws Exception {
//返回TcpTransportFactory
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
1. TransportFactory. findTransportFactory
- 从TRANSPORT_FACTORYS这个Map集合中,根据scheme去获得一个TransportFactory指定的实例对象
- 如果Map集合中不存在,则通过TRANSPORT_FACTORY_FINDER去找一个并且构建实例
这个地方又有点类似于我们之前所学过的SPI的思想吧?他会从META-INF/services/org/apache/activemq/transport/ 这个路径下,根据URI组装的scheme去找到匹配的class对象并且实例化,所以根据tcp为key去对应的路径下可以找到TcpTransportFactory
代码语言:javascript复制private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" location "]");
} else {
TransportFactory tf = (TransportFactory)TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
try {
//构建实例
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable var4) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" scheme "]", var4);
}
}
return tf;
}
}
2. tf.doConnect(location)
TransportFactory doConnect
代码语言:javascript复制public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap(URISupport.parseParameters(location));
if (!options.containsKey("wireFormat.host")) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = this.createWireFormat(options);
//创建一个Transport,创建一个socket连接 -> 终于找到真相了
Transport transport = this.createTransport(location, wf);
//配置configure,这个里面是对Transport做链路包装 这里==============
Transport rc = this.configure(transport, wf, options);
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " options);
} else {
return rc;
}
} catch (URISyntaxException var6) {
throw IOExceptionSupport.create(var6);
}
}
TransportFactory configure
代码语言:javascript复制public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//组装一个复合的transport,这里会包装两层,一个是IactivityMonitor.另一个是WireFormatNegotiator
transport = this.compositeConfigure(transport, wf, options);
//再做一层包装,MutexTransport
Transport transport = new MutexTransport(transport);
//包装ResponseCorrelator
Transport transport = new ResponseCorrelator(transport);
return transport;
}
到目前为止,这个transport实际上就是一个调用链了,他的链结构为 ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport())) 每一层包装表示什么意思呢? ResponseCorrelator 用于实现异步请求。 MutexTransport 实现写锁,表示同一时间只允许发送一个请求 WireFormatNegotiator 实现了客户端连接broker的时候先发送数据解析相关的协议信息,比如解析版本号,是否 使用缓存等 InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每10s发送一次心跳信息。服务端每30s读取 一次心跳信息。
同步发送和异步发送的区别
ResponseCorrelator 的request
代码语言:javascript复制 public Object request(Object command, int timeout) throws IOException {
//也是调用异步的方法
FutureResponse response = this.asyncRequest(command, (ResponseCallback)null);
// 从future方法阻塞等待返回
return response.getResult(timeout);
}
在ResponseCorrelator的request方法中,需要通过response.getResult去获得broker的反馈,否则会阻塞
同步不阻塞发送 阻塞获取结果
持久化消息和非持久化消息的存储原理
正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在 ${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点 SystemUsage配置设置了一些系统内存和硬盘容量
代码语言:javascript复制 <systemUsage>
<systemUsage>
<memoryUsage>
//该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的percentOfJvmHeap属性表示百分比。占用70%的堆内存
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
//该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
//一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域,虽然我们说过非持久化消息不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时非持久化消息大量堆积致使内存耗尽的情况出现,还是会将非持久化消息写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个tempstore区域的“可用磁盘空间限制”
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除
消息的持久化策略分析
消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式
持久化存储支持类型
ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
- KahaDB存储(默认存储方式)
- JDBC存储
- Memory存储
- LevelDB存储
- JDBC With ActiveMQ Journal
KahaDB存储
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个 索引文件来存储它所有的地址。 KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到 data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB的配置方式
代码语言:javascript复制 <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB的存储原理
在data/kahadb这个目录下,会生成四个文件
- db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-.log里面存储的消息
- db.redo 用来进行消息恢复
- db-.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增
- lock文件 锁,表示当前获得kahadb读写权限的broker
JDBC存储
使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。 ACTIVEMQ_MSGS 消息表,queue和topic都存在这个表中 ACTIVEMQ_ACKS 存储持久订阅的信息和最后一个持久订阅接收的消息ID ACTIVEMQ_LOCKS 锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
JDBC存储实践
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这 样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false
- Mysql持久化Bean配置
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</bean>
- 添加Jar包依赖 放在lib目录下
LevelDB存储
LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB。并且,在ActiveMQ 5.9版本提供 了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。 不过,据ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB
代码语言:javascript复制<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
Memory 消息存储
基于内存的消息存储,内存消息存储主要是存储所有的持久化的消息在内存中。broker标签 persistent=”false”,表示不设置持久化存储,直接存储到内存中
代码语言:javascript复制<beans>
<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
</broker>
</beans>
JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。 ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。 当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的10%的消息到DB。 如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
配置方法:
将原来的标签注释掉 添加如下标签
代码语言:javascript复制<persistenceFactory>
<journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemq-data"/>
</persistenceFactory>
在服务端循环发送消息。可以看到数据是延迟同步到数据库的