1.1 异步投递
1.1.1 概述
ActiveMQ 支持同步、异步两种发送的模式将消息发送到 Broker,模式的选择对发送延时有巨大的影响。producer 能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。ActiveMQ 默认使用异步发送通的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。 如果没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 Broker 返回一个确认,表示消息己经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。 异步投递可以最大化 produer 端的发送效率。通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 producer 性能;不过这也带来了额外的问题,就是需要消耗较多的 Client 端内存同时也会导致 Broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。
1.1.2 设置异步投递
代码语言:javascript复制// 官网推荐,在连接中添加参数
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
// 在 ConnectionFactory 级别配置异步发送
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
// 在连接级别配置异步发送
((ActiveMQConnection)connection).setUseAsyncSend(true);
1.1.3 确认投递成功
异步发送丢失消息的场景是:生产者设置 jms.useAsyncSend=true,使用 producer.send(msg) 持续发送消息。由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。所以,正确的异步发送方法是需要接收回调的。同步发送和异步发送的区别就在此,同步发送等 send 不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description 异步投递回调
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
private String URL = "tcp://127.0.0.1:61618";
@Test
public void send() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 开启异步投递
factory.setUseAsyncSend(true);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 添加一个标识
String msgID = UUID.randomUUID().toString();
textMessage.setJMSMessageID(msgID);
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 发送消息,回调
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(msgID "已经发送成功");
}
@Override
public void onException(JMSException e) {
System.out.println(msgID "发送失败了");
}
});
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.2 延时/定时投递
1.2.1 概述
ActiveMQ 对消息延时和定时投递做了很好的支持,其内部启动 Scheduled 来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage
,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,Broker 端的调度器就会按照我们想要的行为去处理消息。注意下面 corn 表达式并非 Quartz 框架中的 corn 表达式,而是 linux 中 corntab 中的表达 式,基本顺序是 分(0-59)
时(0-23)
日(1-31)
月(1-12)
星期(1-7)
属性 | 类型 | 描述 |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | Cron 表达式 |
1.2.2 配置文件
代码语言:javascript复制<!-- 开启延时投递 -->
<broker xmlns="http://activemq.apache.org/schema/core" ··· schedulerSupport="true" >
···
</broker>
1.2.3 示例
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/11
* @description 延时投递
*/
public class Demo {
private static String URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
// 延迟投递时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 50 * 1000);
// 重试时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
// 投递次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 6);
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 开启连接
connection.start();
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/11
* @description 定时投递
*/
public class Demo {
private static String URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
// 定时投递
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 开启连接
connection.start();
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.3 重试机制
1.3.1 概述
消费者收到消息,之后出现异常了,没有告诉 Broker 确认收到该消息,Broker 会尝试再将该消息发送给消费者。默认间隔 1s 重试 6次,一个消息被重发给消费者端超过默认的最大重发次数时,消费者端会给 MQ 发一个 poison ack
表示这个消息有毒,告诉 Broker 不要再发了。这个时候 Broker 会把这个消息放到 DLQ(死信队列),以下情况会引起重发
♞ Client 用了 transactions 且在 session 中调用了 rollback
♞ Client 用了 transactions 且在调用 commit 之前关闭或者没有 commit
♞ Client 在 CLIENT_ACKNOWLEDGE 的签收模式下,session 中调用了 recover()
1.3.2 相关参数
参数 | 描述 |
---|---|
collisionAvoidanceFactor | 设置防止冲突范围的正负百分比,只有启用 useCollisionAvoidance 参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为 0.15 |
maximumRedeliveries | 最大重试次数,达到最大重试次数后抛出异常。为 -1 时不限制次数,为 0 时表示不进行重试。默认值为 6 |
maximumRedeliveryDelay | 最大传送延迟,只在 useExponentialBackOf 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。 |
initialRedeliveryDelay | 初始重发延迟时间,默认1000L |
redeliveryDelay | 重发延迟时间,当 initialRedeliveryDelay=0 时生效,默认 1000L |
useCollisionAvoidance | 启用防止冲突功能,默认 false |
useExponentialBackOff | 启用指数倍数递增的方式增加延迟时间,默认 false |
backOffMultiplier | 重连时间间隔递增倍数,只有值大于 1 和启用 useExponentialBackOff 参数时才生效。默认是 5 |
1.3.3 示例
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/11
* @description 消息重试机制
*/
public class Demo {
private static String URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 设置重试参数,相关参数参考 1.3.2
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
factory.setRedeliveryPolicy(redeliveryPolicy);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 开启连接
connection.start();
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.4 死信队列
1.4.1 概述
ActiveMQ 中引入了死信队列(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发 6次),将会被 ActiveMQ 移入死信队列。开发人员可以在这个 Queue 中查看处理出错的消息,进行人工干预。
1.4.2 死信队列配置
☞ 默认配置文件
☞ sharedDeadLetterStrategy
将所有的 DeadLetter 保存在一个共享的队列中,这是 ActiveMQ broker 端默认的策略。共享队列默认为 ActiveMQ.DLQ
,可以通过 deadLetterQueue 属性来设定。
<!-— 设置所有队列,使用 '>' ,否则用队列名称 -->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE" />
</deadLetterStrategy>
</policyEntry>
☞ individualDeadLetterStrategy
把 DeadLetter 放入各自的死信通道中,对于 Queue 而言,死信通道的前缀默认为 ActiveMQ.DLQ.Queue;对于 Topic 而言,死信通道的前缀默认为 ActiveMQ.DLQ.Topic;比如队列 Order,那么它对应的死信通道为 A ctiveMQ.DLQ.Queue.Order。可以使用 queuePrefix、topicPrefix 来指定上述前缀。 默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。将队列 Order 中出现的 DeadLeter 保存在 DLQ.Order 中,不过此时 DLQ.Order 为 Topic。属性 useQueueForTopicMessages,此值表示是否将 Topic 的 DeadLetter 保存在 Queue 中,默认为 true。
代码语言:javascript复制<policyEntry queue=">">
<deadLetterStrategy>
<!--
queuePrefix: 设置死信队列前缀
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置 useQueueForTopicMessages,使用 Topic 来保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
☞ 过期消息不保存到死信队列
有时需要直接删除过期的消息而不需要发送到死队列中,processExpired 表示是否将过期消息放入死信队列,默认为 true;
代码语言:javascript复制<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
☞ 非持久消息保存到死信队列
默认情况下,ActiveMQ 不会把非持久的死消息发送到死信队列中。processNonPersistent 表示是否将“非持久化”消息放入死信队列,默认为 false。非持久性如果你想把非持久的消息发送到死队列中,需要设置属性为 true。
代码语言:javascript复制<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>
1.5 幂等性
1.5.1 MQ 内部幂等
对于每条消息,MQ 内部生成一个全局唯一、与业务无关的消息 ID:inner-msg-id。当 MQ-server 接收到消息时,先根据 inner-msg-id 判断消息是否重复发送,再决定是否将消息持久化到 DB 中。这样,有了这个 inner-msg-id 作为去重的依据就能保证一条消息只能一次持久化到 DB。
1.5.2 消费者幂等
Ⅰ 生成一个唯一 ID 标记每一条消息,将消息处理成功和去重日志通过事物的形式写入去重表。 Ⅱ 生成一个唯一 ID 标记每一条消息,只要消费过该消息,将 ID 写入 Redis。消费者开始消费前,先去 Redis 中查询有没消费记录 Ⅲ 如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。