说到 ActiveMQ 可靠性不可不提持久性、事务以及签收,正是这三个保证了单机版 ActiveMQ 的可靠性
1.1 持久性
1.1.1 非持久
☞ 概述
所谓非持久化就是在 ActiveMQ 凉凉之后,消息不会被保留下来。
☞ 示例
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description Queue 持久化消息生产者
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void send() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 设置非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.1.2 持久化
☞ 概述
Queue 持久化与生产者有关,所谓持久化就是在 ActiveMQ 凉凉之后,消息会被保留下来,ActiveMQ 再次启动之后会发给消费者,默认 ActiveMQ 就是持久化的。
☞ 示例
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description Queue 消息生产者
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void send() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.1.3 持久化 Topic
☞ 概述
Topic 持久化主要针对订阅者,需要注意的是需要先告诉 MQ 我订阅了 XXX 你得把消息给我留着。
☞ 示例
Topic 相较于 Queue 没有太大的变更,需要注意的是设置了持久化之后在开启连接。
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description Topic 持久化消息生产者
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void send() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createTopic("myText");
MessageProducer producer = session.createProducer(destination);
// 设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 开启连接
connection.start();
// 发送消息
producer.send(textMessage);
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
消费者这里不再创建 consumer 而是 subscriber,由他去订阅主题,注意一定要先启动一次消费者通知 ActiveMQ
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description topic 持久化消费者
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void receive() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消费者
Topic topic = session.createTopic("myText");
// 订阅消息
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");
// 开启连接
connection.start();
// 消费消息
Message message = subscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" textMessage.getText());
// 等待 10s 接收消息
message = subscriber.receive(10000);
}
// 关闭资源
subscriber.close();
session.close();
connection.close();
}
}
1.2 事务
1.2.1 概述
在数据库中我们希望在一组操作中能够公共进退,要么都成功要么都失败。在 MQ 中我们也有这种需求,要么所有消息都发送成功要么都失败。我们使用 Session createSession(boolean transacted, int acknowledgeMode);
创建 session 时会传递两个参数,第一个参数就是是否开启事务,false 代表自动提交,true 代表手动提交。事务一般都偏生产者。
1.2.1 生产者事务
消息发送完毕之后一定要使用 session.commit()
来提交事务,否则消息全部发送失败。
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description 消息生产者事务
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void send() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息生产者");
// 创建生产者
Destination destination = session.createQueue("myText");
MessageProducer producer = session.createProducer(destination);
// 开启连接
connection.start();
// 发送消息
producer.send(textMessage);
// 提交事务
session.commit();
// 关闭资源
producer.close();
session.close();
connection.close();
}
}
1.2.3 消费者事务
消息消费完毕之后一定要使用 session.commit()
来提交事务,否则 MQ 不知道你是否消费成功,会导致重复消费
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description 消费者事务
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void receive() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建消费者
Destination destination = session.createQueue("myText");
MessageConsumer consumer = session.createConsumer(destination);
// 消费消息
Message message = consumer.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" textMessage.getText());
// 等待 10s 接收消息
message = consumer.receive(10000);
}
// 关闭资源
session.commit();
consumer.close();
session.close();
connection.close();
}
}
1.3 签收
1.3.1 概述
Active MQ 主要与消费者有关,签收有三种模式 Session.AUTO_ACKNOWLEDGE
自动签收,当客户端从 MQ 获取消息成功时自动签收;Session.CLIENT_ACKNOWLEDGE
手动签收,需要客户端主动调用 acknowledge 方法签收消息;Session.DUPS_OK_ACKNOWLEDGE
可重复签收(不用),不必急于确认收到的消息,允许在收到多个消息之后一次完成确认。
1.3.2 非事务消费者签收
非事务下手动签收模式需要手动签收,否则认为未消费该消息
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description 手动签收
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void receive() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建消费者
Destination destination = session.createQueue("myText");
MessageConsumer consumer = session.createConsumer(destination);
// 消费消息
Message message = consumer.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" textMessage.getText());
// 手动签收
textMessage.acknowledge();
// 等待 10s 接收消息
message = consumer.receive(10000);
}
// 关闭资源
consumer.close();
session.close();
connection.close();
}
}
1.3.3 事务消费者签收
未手动签收的消息在提交事务之后会自动被签收。
代码语言:javascript复制/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/9
* @description
*/
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
private String URL = "tcp://127.0.0.1:61616";
@Test
public void receive() throws JMSException {
// 创建工厂对象
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 由工厂对象创建连接对象
Connection connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话, 第一个参数是事务,第二个参数是签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
// 创建消费者
Destination destination = session.createQueue("myText");
MessageConsumer consumer = session.createConsumer(destination);
// 消费消息
Message message = consumer.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的内容是:" textMessage.getText());
// 手动签收
//textMessage.acknowledge();
// 等待 10s 接收消息
message = consumer.receive(10000);
}
// 关闭资源
session.commit();
consumer.close();
session.close();
connection.close();
}
}