MQ 系列之 ActiveMQ 可靠性

2020-12-11 16:22:09 浏览数 (2)

说到 ActiveMQ 可靠性不可不提持久性、事务以及签收,正是这三个保证了单机版 ActiveMQ 的可靠性

1.1 持久性

1.1.1 非持久

☞ 概述

所谓非持久化就是在 ActiveMQ 凉凉之后,消息不会被保留下来。

☞ 示例
代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description Queue 持久化消息生产者
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void send() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();
        // 开启连接
        connection.start();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是消息生产者");

        // 创建生产者
        Destination destination = session.createQueue("myText");
        MessageProducer producer = session.createProducer(destination);
        // 设置非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        // 发送消息
        producer.send(textMessage);

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

1.1.2 持久化

☞ 概述

Queue 持久化与生产者有关,所谓持久化就是在 ActiveMQ 凉凉之后,消息会被保留下来,ActiveMQ 再次启动之后会发给消费者,默认 ActiveMQ 就是持久化的。

☞ 示例
代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description Queue 消息生产者
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void send() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();
        // 开启连接
        connection.start();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是消息生产者");

        // 创建生产者
        Destination destination = session.createQueue("myText");
        MessageProducer producer = session.createProducer(destination);
        // 设置持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 发送消息
        producer.send(textMessage);

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

1.1.3 持久化 Topic

☞ 概述

Topic 持久化主要针对订阅者,需要注意的是需要先告诉 MQ 我订阅了 XXX 你得把消息给我留着。

☞ 示例

Topic 相较于 Queue 没有太大的变更,需要注意的是设置了持久化之后在开启连接。

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description Topic 持久化消息生产者
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void send() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是消息生产者");

        // 创建生产者
        Destination destination = session.createTopic("myText");
        MessageProducer producer = session.createProducer(destination);
        // 设置持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 开启连接
        connection.start();

        // 发送消息
        producer.send(textMessage);

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

消费者这里不再创建 consumer 而是 subscriber,由他去订阅主题,注意一定要先启动一次消费者通知 ActiveMQ

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description topic 持久化消费者
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void receive() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建消费者
        Topic topic = session.createTopic("myText");
        // 订阅消息
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");

        // 开启连接
        connection.start();

        // 消费消息
        Message message = subscriber.receive();

        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的内容是:"   textMessage.getText());

            // 等待 10s 接收消息
            message = subscriber.receive(10000);
        }

        // 关闭资源
        subscriber.close();
        session.close();
        connection.close();
    }
}

1.2 事务

1.2.1 概述

  在数据库中我们希望在一组操作中能够公共进退,要么都成功要么都失败。在 MQ 中我们也有这种需求,要么所有消息都发送成功要么都失败。我们使用 Session createSession(boolean transacted, int acknowledgeMode); 创建 session 时会传递两个参数,第一个参数就是是否开启事务,false 代表自动提交,true 代表手动提交。事务一般都偏生产者。

1.2.1 生产者事务

消息发送完毕之后一定要使用 session.commit() 来提交事务,否则消息全部发送失败。

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description 消息生产者事务
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQSend {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void send() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建消息
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是消息生产者");

        // 创建生产者
        Destination destination = session.createQueue("myText");
        MessageProducer producer = session.createProducer(destination);

        // 开启连接
        connection.start();

        // 发送消息
        producer.send(textMessage);
        // 提交事务
        session.commit();

        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

1.2.3 消费者事务

消息消费完毕之后一定要使用 session.commit() 来提交事务,否则 MQ 不知道你是否消费成功,会导致重复消费

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description 消费者事务
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void receive() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();
        // 开启连接
        connection.start();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // 创建消费者
        Destination destination = session.createQueue("myText");
        MessageConsumer consumer = session.createConsumer(destination);

        // 消费消息
        Message message = consumer.receive();

        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的内容是:"   textMessage.getText());

            // 等待 10s 接收消息
            message = consumer.receive(10000);
        }

        // 关闭资源
        session.commit();
        consumer.close();
        session.close();
        connection.close();
    }
}

1.3 签收

1.3.1 概述

  Active MQ 主要与消费者有关,签收有三种模式 Session.AUTO_ACKNOWLEDGE 自动签收,当客户端从 MQ 获取消息成功时自动签收;Session.CLIENT_ACKNOWLEDGE 手动签收,需要客户端主动调用 acknowledge 方法签收消息;Session.DUPS_OK_ACKNOWLEDGE 可重复签收(不用),不必急于确认收到的消息,允许在收到多个消息之后一次完成确认。

1.3.2 非事务消费者签收

非事务下手动签收模式需要手动签收,否则认为未消费该消息

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description 手动签收
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void receive() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();
        // 开启连接
        connection.start();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        // 创建消费者
        Destination destination = session.createQueue("myText");
        MessageConsumer consumer = session.createConsumer(destination);

        // 消费消息
        Message message = consumer.receive();

        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的内容是:"   textMessage.getText());
            // 手动签收
            textMessage.acknowledge();

            // 等待 10s 接收消息
            message = consumer.receive(10000);
        }

        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

1.3.3 事务消费者签收

未手动签收的消息在提交事务之后会自动被签收。

代码语言:javascript复制
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/12/9
 * @description
 */
@SpringJUnitConfig(locations = "classpath:application.properties")
public class ActiveMQReceive {
    private String URL = "tcp://127.0.0.1:61616";

    @Test
    public void receive() throws JMSException {
        // 创建工厂对象
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 由工厂对象创建连接对象
        Connection connection = factory.createConnection();
        // 开启连接
        connection.start();

        // 创建会话, 第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

        // 创建消费者
        Destination destination = session.createQueue("myText");
        MessageConsumer consumer = session.createConsumer(destination);

        // 消费消息
        Message message = consumer.receive();

        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的内容是:"   textMessage.getText());
            // 手动签收
            //textMessage.acknowledge();

            // 等待 10s 接收消息
            message = consumer.receive(10000);
        }

        // 关闭资源
        session.commit();
        consumer.close();
        session.close();
        connection.close();
    }
}

0 人点赞