ActiveMQ

2023-11-27 15:41:51 浏览数 (2)

ActiveMQ入门

消息中间件应用场景

异步处理 应用解耦 流量削锋

异步处理

场景说明:用户注册,需要执行三个业务逻辑,分别为写入用户表,发注册邮件以及注册短信。

串行方式

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客 户端。

并行方式

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客 户端。与串行的差别是,并行的方式可以提高处理的时间

异步处理

引入消息中间件,将部分的业务逻辑,进行异步处理。改造后的架构如下

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是 50毫秒。注册邮件,发送 短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间 可能是50毫秒。因此架构改变后,系统的吞吐量提高啦,比串行提高了3倍,比并行提高了两倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。 传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存 系统耦合。如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存 系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假 如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再 关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量消峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。应用场景:秒杀活动,一般 会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。 通过加入消息队列完成如下功能: a、可以控制活动的人数 b、可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请 求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理

ActiveMQ简介及JMS

什么是 ActiveMQ?

官网: http://activemq.apache.org/ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和 J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。

什么是JMS?

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统 的集成。它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有 Producer(生产者)、Consumer(消费者)。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高 性能,高可用,可伸缩和最终一致性架构。

JMS( Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的 Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库 的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够 通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消 息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息 的元数据组成。消息主体则携带着应用程序的数据或有效负载。

JMS消息模型

消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。 (1) P2P (Point to Point) 点对点模型(Queue队列模型) (2) Publish/Subscribe(Pub/Sub) 发布/订阅模型(Topic主题模型)

点对点模型

点对点模型( Pointer-to-Pointer):即生产者和消费者之间的消息往来。

每个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或 超时。

点对点模型的特点:

  • 每个消息只有一个消费者( Consumer)(即一旦被消费,消息就不再在消息队列中);
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有 正在运行,它不会影响到消息被发送到队列;
  • 接收者在成功接收消息之后需向队列应答成功

发布/订阅模型

发布/订阅(Publish-Subscribe)

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发 送到topic,系统将这些消息投递到订阅此topic的订阅者

发布者发送到 topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发 布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的 拷贝。

发布/订阅模型的特点:

  • 每个消息可以有多个消费者;
  • 发布者和订阅者之间有时间上的依赖性(先订阅主题,再来发送消息)。
  • 订阅者必须保持运行的状态,才能接受发布者发布的消息;

JMS编程API

  1. ConnectionFactory 创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和 TopicConnectionFactory两种。
  2. Destination Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来 说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象: Queue、Topic
  3. Connection Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产 生一个或多个Session
  4. Session Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供 了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务 中。
  5. Producter Producter(消息生产者):消息生产者由Session创建,并用于将消息发送到Destination。同样,消 息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish 方法)发送消息。
  6. Consumer Consumer (消息消费者):消息消费者由Session创建,用于接收被发送到Destination的消息。两种 类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或 createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化 的订阅者。
  7. MessageListener 消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的 MDB(Message-Driven Bean)就是一种MessageListener。

ActiveMQ的安装

安装

代码语言:javascript复制
1. 第一步:安装 jdk(略)
2. 第二步:把 activemq的压缩包(apache-activemq-5.14.5-bin.tar.gz)上传到 linux 系统
3. 第三步:解压缩压缩包 
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
4. 第四步:进入apache-activemq-5.14.5的bin目录
cd apache-activemq-5.14.5/bin
5. 第五步:启动 activemq
./activemq start (执行2次:第一次:生成配置信息;第二次:启动)
6. 第六步:停止activemq:./activemq stop

访问 页面控制台: http://ip:8161 (监控) 请求地址:  tcp://ip:61616 (java代码访问消息中间件) 账号: admin 密码:admin

图1:登陆:

图 2:点击Queues队列或者Topics主题消息

代码语言:javascript复制
列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。

原生JMS API操作ActiveMQ

PTP 模式(生产者)

  1. 引入依赖
代码语言:javascript复制
<dependencies>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.11.2</version>
    </dependency>
</dependencies>
  1. 编写生产消息的测试类 QueueProducer 步骤:
代码语言:javascript复制
1.创建连接工厂
2.创建连接
3.打开连接
4.创建session
5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
6.创建消息生产者
7.创建消息
8.发送消息
9.释放资源
代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式 -- 消息生产者
*/
public class PTP_Producer {
  public static void main(String[] args) throws JMSException {
    //1.创建连接工厂
    ConnectionFactory factory
        = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
    //2.创建连接
    Connection connection = factory.createConnection();
    //3.打开连接
    connection.start();
    //4.创建session
    /**
    * 参数一:是否开启事务操作* 参数二:消息确认机制
    */
    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    Queue queue = session.createQueue("queue01");
    //6.创建消息生产者
    MessageProducer producer = session.createProducer(queue);
    //7.创建消息
    //createTextMessage: 文本类型
    TextMessage textMessage = session.createTextMessage("test message");
    //8.发送消息
    producer.send(textMessage);
    System.out.println("消息发送完成");
    //9.释放资源
    session.close();
    connection.close();
 }
}

观察发送消息的结果:

PTP模式(消费者)

步骤:

代码语言:javascript复制
1.创建连接工厂
2.创建连接
3.打开连接
4.创建session
5.指定目标地址
6.创建消息的消费者
7.配置消息监听器

第一种消费者写法:

代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式- 消息消费者(第一种方案)
*/
public class PTP_Consumer1 {
  public static void main(String[] args) throws JMSException {
    //1.创建连接工厂
    ConnectionFactory factory
        = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
    //2.创建连接
    Connection connection = factory.createConnection();
    //3.打开连接
    connection.start();
    //4.创建session
    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    //5.指定目标地址
    Queue queue = session.createQueue("queue01");
    //6.创建消息的消费者
    MessageConsumer consumer = session.createConsumer(queue);
    //7.接收消息
    while(true){
      Message message = consumer.receive();
      //如果已经没有消息了,结束啦
      if(message==null){
        break;
     }
      //如果还有消息,判断什么类型的消息
      if(message instanceof TextMessage){
        TextMessage textMessage = (TextMessage)message;
        System.out.println("接收的消息:" textMessage.getText());
     }
   }
 }
}

第二种消费者写法(推荐):

代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
*/
public class PTP_Consumer2 {
  public static void main(String[] args) throws JMSException {
    //1.创建连接工厂
    ConnectionFactory factory
        = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
    //2.创建连接
    Connection connection = factory.createConnection();
    //3.打开连接
    connection.start();
    //4.创建session
    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    //5.指定目标地址
    Queue queue = session.createQueue("queue01");
    //6.创建消息的消费者
    MessageConsumer consumer = session.createConsumer(queue);
    //7.设置消息监听器来接收消息
    consumer.setMessageListener(new MessageListener() {
      //处理消息
      @Override
      public void onMessage(Message message) {
        if(message instanceof TextMessage){
          TextMessage textMessage = (TextMessage)message;
          try {
            System.out.println("接收的消息
(2):" textMessage.getText());
         } catch (JMSException e) {
            e.printStackTrace();
         }
       }
     }
   });
//注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
 }
}

观察消费消息的结果:

Pub/Sub模式(生成者)

代码语言:javascript复制
1.创建连接工厂
2.创建连接
3.打开连接
4.创建session
5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
6.创建消息生产者
7.创建消息
8.发送消息
9.释放资源
代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的发送方
*/
public class TopicProducer {
  public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
    //2.创建连接
    Connection connection = factory.createConnection();
    //3.打开连接
connection.start();
    //4.创建session
    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    Topic topic = session.createTopic("sms");
    //6.创建消息生产者
    MessageProducer producer = session.createProducer(topic);
    //7.创建消息
    TextMessage message = session.createTextMessage("发短信...");
    //8.发送消息
    producer.send(message);
    System.out.println("发送消息:发短信...");
    session.close();;
    connection.close();
 }
}

查看主题消息:

Pub/Sub模式(消费者)

代码语言:javascript复制
1.创建连接工厂
2.创建连接
3.打开连接
4.创建session
5指定目标地址
6.创建消息的消费者
7.配置消息监听器
代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主题消息,消息的消费方
*/
public class TopicConsumer {
  public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
    //2.创建连接
    Connection connection = factory.createConnection();
    //3.打开连接
    connection.start();
    //4.创建session
    Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
    Topic topic = session.createTopic("sms");
    //6.创建消息的消费者
    MessageConsumer consumer = session.createConsumer(topic);
    //7.配置消息监听器
    consumer.setMessageListener(new MessageListener() {
      @Override
      public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
          System.out.println("消费消息:"   textMessage.getText());
       } catch (JMSException e) {
          e.printStackTrace();
       }
     }
   });
 }
}

Spring与ActiveMQ整合

消息生产者

  1. 引入依赖
代码语言:javascript复制
<dependencies>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.11.2</version>
    </dependency>
<dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-oxm</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context-support</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>5.0.2.RELEASE</version>
    </dependency>
   <dependency>
	<groupId>javax.jms</groupId>
      <artifactId>javax.jms-api</artifactId>
      <version>2.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.7</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
  </dependencies>
  1. 编写Spring整合ActiveMQ配置:applicationContext-producer.xml
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amp="http://activemq.apache.org/schema/core"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
  <!--1.创建连接工厂对象-->
  <amp:connectionFactory
    id="connetionFactory"
    brokerURL="tcp://192.168.66.133:61616"
    userName="admin"
    password="admin"
  />
  <!--2.创建缓存连接工厂-->
  <bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
    <!--注入连接工厂-->
    <property name="targetConnectionFactory" ref="connetionFactory"/>
    <!--缓存消息数据-->
    <property name="sessionCacheSize" value="5"/>
  </bean>
  <!--3.创建用于点对点发送的JmsTemplate-->
  <bean id="jmsQueueTemplate"
class="org.springframework.jms.core.JmsTemplate">
    <!--注入缓存连接工厂-->
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <!--指定是否为发布订阅模式-->
    <property name="pubSubDomain" value="false"/>
  </bean>
  <!--4.创建用于发布订阅发送的JmsTemplate-->
  <bean id="jmsTopicTemplate"
class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂-->
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <!--指定是否为发布订阅模式-->
    <property name="pubSubDomain" value="true"/>
  </bean>
</beans>
  1. 编写测试类,实现发送消息
代码语言:javascript复制
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 演示Spring与ActiveMQ整合
*/
@RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
@ContextConfiguration("classpath:applicationContext-producer.xml") // 加载spring
配置文件
public class SpringProducer {
  //点对点模式
  @Autowired
  @Qualifier("jmsQueueTemplate")
  private JmsTemplate jmsQueueTemplate;
  //发布订阅模式
  @Autowired
  @Qualifier("jmsTopicTemplate")
  private JmsTemplate jmsTopicTemplate;
  /**
  * 点对点发送
  */
  @Test
  public void ptpSender(){
    /**
    * 参数一:指定队列的名称
    * 参数二:MessageCreator接口,我们需要提供该接口的匿名内部实现
    */
    jmsQueueTemplate.send("spring_queue", new MessageCreator() {
//我们只需要返回发送的消息内容即可
      @Override
      public Message createMessage(Session session) throws JMSException {
        //创建文本消息
        TextMessage textMessage = session.createTextMessage("spring test
message");
        return textMessage;
     }
   });
    System.out.println("消息发送已完成");
 }
  /**
  * 发布订阅发送
  */
  @Test
  public void psSender(){
    jmsTopicTemplate.send("spring_topic", new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        //创建文本消息
        TextMessage textMessage = session.createTextMessage("spring test
message--topic");
        return textMessage;
     }
   });
    System.out.println("消息发送已完成");
 }
}

消息消费者

  1. 编写监听器:监听主题消息、队列消息
代码语言:javascript复制
@Component
 public class EmailMessageListener implements MessageListener {
  @Override
   public void onMessage(Message message) {
     MapMessage mapMessage = (MapMessage) message;
     try {
       String email = mapMessage.getString("email");
       System.out.println("消费消息:"   email);
     } catch (JMSException e) {
       e.printStackTrace();
     }
   }
 }
  1. 编写Spring整合ActiveMQ配置:applicationContext-consumer.xml
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:aop="http://www.springframework.org/schema/aop"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="
   http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
   http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
   http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
   http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms.xsd
   http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd">
  <!-- 1. 创建ActiveMQ连接工厂 -->
  <amq:connectionFactory
      id="amqConnectionFactory"
      userName="admin" password="admin"
      brokerURL="tcp://192.168.12.132:61616"/>
  <!-- 2. 创建缓存工厂 -->
  <bean id="cachingConnectionFactory"
   
 class="org.springframework.jms.connection.CachingConnectionFactory">
    <!-- 注入 连接工厂-->
    <property name="targetConnectionFactory" ref="amqConnectionFactory">
</property>
    <!-- session缓存数目 -->
    <property name="sessionCacheSize" value="5"></property>
  </bean>
  <!--开启注解扫描-->
  <context:component-scan base-
package="cn.itcast.spring_activemq_consumer"/>
  <!--
    配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。
    jms:listener-container
      destination-type 监听的JMS消息类型(queue、topic)
      connection-factory Spring的缓存连接工厂
    jms:listener
      destination 对应MQ中队列名称或主题名称
      rel     消息监听器类(实现MessageListener接口)
  -->
  <!-- 3.1 监听指定名称(email)的队列中的消息-->
  <jms:listener-container destination-type="queue" connection-
factory="cachingConnectionFactory">
    <jms:listener destination="email" ref="emailMessageListener"/>
  </jms:listener-container>
<!-- 3.2 监听指定名称(email)的主题中的消息 -->
  <jms:listener-container destination-type="topic" connection-
factory="cachingConnectionFactory">
    <jms:listener destination="sms" ref="smsMessageListener"/>
  </jms:listener-container>
</beans>

3 . 编写测试类,实现发送消息

代码语言:javascript复制
/**
* Spring整合ActiveMQ消费消息
*/
public class Consumer {
  public static void main(String[] args) throws IOException {
    ApplicationContext ac =
        new ClassPathXmlApplicationContext("applicationContext-
activemq-consumer.xml");
    System.in.read();
 }
}

SpringBoot与ActiveMQ整合

消息生产者

  1. 引入依赖
代码语言:javascript复制
   <dependency>
	<groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
  1. 配置:
代码语言:javascript复制
server:
port: 9001 #端口
spring:
application:
 name: activemq-producer # 服务名称
# springboot与activemq整合配置
activemq:
 broker-url: tcp://192.168.66.133:61616 # 连接地址
 user: admin # activemq用户名
 password: admin  # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
 pub-sub-domain: false
  1. 编写生产者
代码语言:javascript复制
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* 演示SpringBoot与ActiveMQ整合- 消息生产者
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
  //JmsMessagingTemplate: 用于工具类发送消息
  @Autowired
  private JmsMessagingTemplate jmsMessagingTemplate;
  @Test
  public void ptpSender(){
    /**
    * 参数一:队列的名称或主题名称
    * 参数二:消息内容
    */
    jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
message");
 }
}

消息消费者

代码语言:javascript复制
    <dependency>
	<groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

配置:

代码语言:javascript复制
server:
port: 9002 #端口
spring:
application:
 name: activemq-consumer # 服务名称
# springboot与activemq整合配置
activemq:
 broker-url: tcp://192.168.66.133:61616 # 连接地址
 user: admin # activemq用户名
 password: admin  # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
 pub-sub-domain: false
activemq:
name: springboot_queue

编写消费者

代码语言:javascript复制
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
*/
@Component // 放入IOC容器
public class MsgListener {
  /**
  * 用于接收消息的方法
  * destination: 队列的名称或主题的名称
  */
  @JmsListener(destination = "${activemq.name}")
  public void receiveMessage(Message message){
    if(message instanceof TextMessage){
      TextMessage textMessage = (TextMessage)message;
      try {
        System.out.println("接收消息:" textMessage.getText());
     } catch (JMSException e) {
        e.printStackTrace();
     }
   }
 }
}

ActiveMQ消息组成与高级特性

JMS消息组成详解

JMS消息组成格式

整个JMS协议组成结构如下:

JMS Message消息由三部分组成:

  • 消息头
  • 消息体
  • 消息属性
JMS消息头

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

  • 红色 为重要的消息头

不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用以上 setJMSXXX()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的: JMSCorrelationID,JMSReplyTo,JMSType

JMS消息体

在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供 了对已有消息格式的兼容。不同的消息类型如下:

JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据, 提供现有消息格式的一些级别的兼容性。

代码语言:javascript复制
· TextMessage--一个字符串对象  *
· MapMessage--一套名称-值对
· ObjectMessage--一个序列化的 Java 对象  *
· BytesMessage--一个字节的数据流    *
· StreamMessage -- Java原始值的数据流
TextMessage
写出
代码语言:javascript复制
/**
  * 发送TextMessage消息
  */
  @Test
  public void testMessage(){
    jmsTemplate.send(name, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        TextMessage textMessage = session.createTextMessage("文本消息");
        return textMessage;
     }
   });
 }
读取
代码语言:javascript复制
/**
  * 接收TextMessage的方法
  */
  @JmsListener(destination = "${activemq.name}")
  public void receiveMessage(Message message){
    if(message instanceof TextMessage){
      TextMessage textMessage = (TextMessage)message;
      try {
        System.out.println("接收消息:" textMessage.getText());
     } catch (JMSException e) {
        e.printStackTrace();
     }
   }
 }
MapMessage
发送
代码语言:javascript复制
/**
  * 发送MapMessage消息
  */
  @Test
  public void mapMessage(){
    jmsTemplate.send(name, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setString("name","张三");
        mapMessage.setInt("age",20);
        return mapMessage;
     }
   });
 }
接收
代码语言:javascript复制
@JmsListener(destination = "${activemq.name}")
  public void receiveMessage(Message message){
    if(message instanceof MapMessage){
      MapMessage mapMessage = (MapMessage)message;
      try {
System.out.println("名称:" mapMessage.getString("name"));
        System.out.println("年龄:" mapMessage.getString("age"));
     } catch (JMSException e) {
        e.printStackTrace();
     }
   }
 }
ObjectMessage
写入
代码语言:javascript复制
//发送ObjectMessage消息
  @Test
  public void test2(){
    jmsTemplate.send(name, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        User user = new User();
        user.setName("小苍");
        user.setAge(18);
        ObjectMessage objectMessage = session.createObjectMessage(user);
        return objectMessage;
     }
   });
 }
接收
代码语言:javascript复制
@JmsListener(destination = "${activemq.name}")
  public void receiveMessage(Message message){
    if(message instanceof ObjectMessage){
      ObjectMessage objectMessage = (ObjectMessage)message;
      try {
        User user = (User)objectMessage.getObject();
        System.out.println(user.getUsername());
        System.out.println(user.getPassword());
     } catch (JMSException e) {
        e.printStackTrace();
     }
   }
 }

注意: ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的 加入到受信任的列表。

代码语言:javascript复制
spring:
activemq:
 broker-url: tcp://192.168.66.133:61616
 user: admin
 password: admin
 packages:
  trust-all: true # 添加所有包到信任列表
BytesMessage
写出
代码语言:javascript复制
//发送BytesMessage消息
  @Test
  public void test3(){
    jmsTemplate.send(name, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        BytesMessage bytesMessage = session.createBytesMessage();
        try {
          File file = new File("d:/spring.jpg");
          FileInputStream in = new FileInputStream(file);
          byte[] bytes = new byte[(int)file.length()];
          in.read(bytes);
          bytesMessage.writeBytes(bytes);
       } catch (Exception e) {
          e.printStackTrace();
       }
        return bytesMessage;
     }
   });
 }
读取
代码语言:javascript复制
@JmsListener(destination="${activemq.name}")
  public void receiveMessage(Message message) throws Exception {
    BytesMessage bytesMessage = (BytesMessage)message;
    FileOutputStream out = new FileOutputStream("d:/abc.jpg");
    byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
    bytesMessage.readBytes(buf);
    out.write(buf);
    out.close();
 }
StreamMessage
写出
代码语言:javascript复制
//发送StreamMessage消息
  @Test
  public void test4(){
    jmsTemplate.send(name, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        StreamMessage streamMessage = session.createStreamMessage();
        streamMessage.writeString("你好,ActiveMQ");
        streamMessage.writeInt(20);
        return streamMessage;
     }
   });
 }
读取
代码语言:javascript复制
@JmsListener(destination="${activemq.name}")
  public void receiveMessage(Message message) throws Exception {
    StreamMessage streamMessage = (StreamMessage)message;
    String str = streamMessage.readString();
    int i = streamMessage.readInt();
    System.out.println(str);
    System.out.println(i);
 }
JMS消息属性

我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属 性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

代码语言:javascript复制
message.setStringProperty("Property",Property);   //自定义属性

消息持久化

消息持久化是保证消息不丢失的重要方式!!! ActiveMQ提供了以下三种的消息存储方式:

  1. Memory 消息存储-基于内存的消息存储。
  2. 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复 能力。
  3. 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。

ActiveMQ持久化机制流程图:

JDBC消息存储
  1. application.yml
代码语言:javascript复制
server:
port: 9001
spring:
activemq:
 broker-url: tcp://192.168.66.133:61616
 user: admin
 password: admin
jms:
 pub-sub-domain: false # false:点对点队列模式, true:发布/订阅模式
 template:
  delivery-mode: persistent # 持久化
activemq:
name: springboot-queue01
  1. 修改activemq.xml
代码语言:javascript复制
<!--配置数据库连接池-->
  <bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource"
destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url"
value="jdbc:mysql://192.168.66.133:3306/db_activemq" />
    <property name="username" value="root" />
    <property name="password" value="123456"/>
  </bean>
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter>
  <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
  1. 拷贝mysql及durid数据源的jar包到activemq的lib目录下
  2. 重启activemq

消息事务

消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。

一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

生产者、消费者与消息服务器直接都支持事务性;

ActionMQ的事务主要偏向在生产者的应用。

ActionMQ 消息事务流程图:

生产者事务

方式一:

代码语言:javascript复制
/**
  * 事务性发送--方案一
  */
  @Test
  public void sendMessageTx(){
    //获取连接工厂
    ConnectionFactory connectionFactory =
jmsMessagingTemplate.getConnectionFactory();
    Session session = null;
    try {
      //创建连接
      Connection connection = connectionFactory.createConnection();
      /**
      * 参数一:是否开启消息事务
      */
      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
      //创建生产者
      MessageProducer producer =
session.createProducer(session.createQueue(name));
      for(int i=1;i<=10;i  ){
        //模拟异常
        if(i==4){
          int a = 10/0;
       }
        TextMessage textMessage = session.createTextMessage("消息--"  
i);
        producer.send(textMessage);
     }
      //注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达
MQ服务器
      session.commit();
} catch (JMSException e) {
      e.printStackTrace();
      //消息事务回滚
      try {
        session.rollback();
     } catch (JMSException e1) {
        e1.printStackTrace();
     }
   }
 }

方式二: 配置类:

代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
  @Bean
  public PlatformTransactionManager transactionManager(ConnectionFactory
connectionFactory) {
    return new JmsTransactionManager(connectionFactory);
 }
}

生产者业务类

代码语言:javascript复制
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送的业务类
*/
@Service
public class MessageService {
  @Autowired
  private JmsMessagingTemplate jmsMessagingTemplate;
  @Value("${activemq.name}")
  private String name;
  @Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
  public void sendMessage(){
    for(int i=1;i<=10;i  ) {
      //模拟异常
      if(i==4){
        int a = 10/0;
     }
      jmsMessagingTemplate.convertAndSend(name, "消息---" i);
   }
 }
}

测试发送方法:

代码语言:javascript复制
@Autowired
  private MessageService messageService;
  /**
  * 事务性发送--方案二: Spring的JmsTransactionManager功能
  */
  @Test
  public void sendMessageTx2(){
    messageService.sendMessage();
 }
消费者事务
代码语言:javascript复制
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
  /**
  * 接收消息的方法
  */
  @JmsListener(destination="${activemq.name}",containerFactory =
"jmsQueryListenerFactory")
  public void receiveMessage(TextMessage textMessage,Session session) throws
JMSException {
    try {
      System.out.println("消息内容:"   textMessage.getText()   ",是否重发:"
  textMessage.getJMSRedelivered());
      int i = 100/0; //模拟异常
     
      session.commit();//提交事务
   } catch (JMSException e) {
      try {
        session.rollback();//回滚事务
     } catch (JMSException e1) {
     }
      e.printStackTrace();
   }
 }
}

消息确认机制

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接 收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参 数有以下三个可选值:

注意:消息确认机制与事务机制是冲突的,只能选其中一种。所以演示消息确认前,先关闭事务。

auto_acknowledge 自动确认

添加配置类

代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
  @Bean(name="jmsQueryListenerFactory")
  public DefaultJmsListenerContainerFactory
 jmsListenerContainerFactory(ConnectionFactory connectionFactory){
    DefaultJmsListenerContainerFactory factory=new
DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setSessionTransacted(false); // 不开启事务操作
    factory.setSessionAcknowledgeMode(1); //自动确认
    return factory;
 }
}
消费者
代码语言:javascript复制
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
  /**
  * 接收消息的方法
  */	
@JmsListener(destination="${activemq.name}",containerFactory =
"jmsQueryListenerFactory")
  public void receiveMessage(TextMessage textMessage){
    try {
      System.out.println("消息内容:"   textMessage.getText()   ",是否重发:"
  textMessage.getJMSRedelivered());
      throw new RuntimeException("test");
   } catch (JMSException e) {
      e.printStackTrace();
   }
 }
}

如果消费方接收消息失败, JMS服务器会重发消息,默认重发6次。

dups_ok_acknowledge

类似于 auto_acknowledge 确认机制,为自动批量确认而生,而且具有“延迟”确认的特点,ActiveMQ 会根据内部算法,在收到一定数量的消息自动进行确认。 在此模式下,可能会出现重复消息,如果消费 方不允许重复消费,不建议使用!

client_acknowledge 手动确认

配置类

代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
  @Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory
 jmsListenerContainerFactory(ConnectionFactory connectionFactory){
    DefaultJmsListenerContainerFactory factory=new
DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setSessionTransacted(false); // 不开启事务操作
    factory.setSessionAcknowledgeMode(4); //手动确认
    return factory;
 }
}
消费者
代码语言:javascript复制
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
  /**
  * 接收消息的方法
  */
  @JmsListener(destination="${activemq.name}",containerFactory =
"jmsQueryListenerFactory")
  public void receiveMessage(TextMessage textMessage){
    try {
      System.out.println("消息内容:"   textMessage.getText()   ",是否重发:"
  textMessage.getJMSRedelivered());
      textMessage.acknowledge(); // 确认收到消息,一旦消息确认,消息不会重新发送
      throw new RuntimeException("test");
   } catch (JMSException e) {
      e.printStackTrace();
   }
 }
}

消息投递方式

异步投递
  1. 异步投递 vs 同步投递

同步发送: 消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把 消息保存到二级存储中。

异步发送: 如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之 前一直阻塞 Producer.send方法。

想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true属性

  1. 如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
  2. 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
  3. 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发 送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。

总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用 异步发送;对于持久化消息采用同步发送!!!

  1. 配置异步投递的方式
代码语言:javascript复制
1.在连接上配置
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
2.通过ConnectionFactory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3.通过connection
((ActiveMQConnection)connection).setUseAsyncSend(true);

注意:如果是Spring或SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递

代码语言:javascript复制
@Configuration
public class ActiveConfig {
/**
  * 配置用于异步发送的非持久化JmsTemplate
  */
  @Autowired
  @Bean
  public JmsTemplate asynJmsTemplate(PooledConnectionFactory
pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    template.setExplicitQosEnabled(true);
    template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    return template;
 }
  /**
  * 配置用于同步发送的持久化JmsTemplate
  */ 
  @Autowired
  @Bean
  public JmsTemplate synJmsTemplate(PooledConnectionFactory
pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    return template;
 }
异步投递如何确认发送成功?

异步投递丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)持续发 送消息。 由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产 者端内存中尚未被发送至 MQ 的消息都会丢失。 这时,可以给异步投递方法接收回调,以确认消息是否发送成功!

代码语言:javascript复制
/**
  * 异步投递,回调函数
  * @return
  */
  @RequestMapping("/send")
  public String sendQueue(){
    Connection connection = null;
    Session session = null;
    ActiveMQMessageProducer producer = null;
    // 获取连接工厂
    ConnectionFactory connectionFactory =
jmsMessagingTemplate.getConnectionFactory();
    try {
      connection = connectionFactory.createConnection();
      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
延迟投递
生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。
1、修改activemq.xml
      Queue queue = session.createQueue(name);
      int count = 10;
      producer = session.createProducer(queue);
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
      long start = System.currentTimeMillis();
      for (int i = 0; i < count; i  ) {
        //创建需要发送的消息
        TextMessage textMessage = session.createTextMessage("Hello");
        //设置消息唯一ID
        String msgid = UUID.randomUUID().toString();
        textMessage.setJMSMessageID(msgid);
        producer.send(textMessage, new AsyncCallback() {
          @Override
          public void onSuccess() {
            // 使用msgid标识来进行消息发送成功的处理
            System.out.println(msgid " 消息发送成功");
         }
          @Override
          public void onException(JMSException exception) {
            // 使用msgid表示进行消息发送失败的处理
            System.out.println(msgid " 消息发送失败");
            exception.printStackTrace();
         }
       });
     }
      session.commit();
   } catch (Exception e) {
      e.printStackTrace();
   }
    return "ok";
 }
延迟投递

生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。

  1. 修改activemq.xml
代码语言:javascript复制
<broker xmlns="http://activemq.apache.org/schema/core" ...
 schedulerSupport="true" >
 ......
</broker>

注意:添加 schedulerSupport="true"配置 2. 在代码中设置延迟时长

代码语言:javascript复制
/**
  * 延时投递
  *
  * @return
  */
 @Test
  public String sendQueue() {
    Connection connection = null;
    Session session = null;
    ActiveMQMessageProducer producer = null;
    // 获取连接工厂
    ConnectionFactory connectionFactory =
jmsMessagingTemplate.getConnectionFactory();
    try {
      connection = connectionFactory.createConnection();
      session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
      Queue queue = session.createQueue(name);
      int count = 10;
      producer = (ActiveMQMessageProducer) session.createProducer(queue);
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
      //创建需要发送的消息
      TextMessage textMessage = session.createTextMessage("Hello");
      //设置延时时长(延时10秒)
      textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
10000);
      producer.send(textMessage);
      session.commit();
   } catch (Exception e) {
      e.printStackTrace();
   }
    return "ok";
 }
定时投递
  1. 启动类添加定时注解
代码语言:javascript复制
@SpringBootApplication
@EnableScheduling // 开启定时功能
public class MyActiveMQApplication {
  public static void main(String[] args) {
    SpringApplication.run(MyActiveMQApplication.class,args);
 }
}
  1. 在生产者添加@Scheduled设置定时
代码语言:javascript复制
/**
* 消息生产者
*/
@Component
public class ProducerController3 {
  @Value("${activemq.name}")
  private String name;
  @Autowired
  private JmsMessagingTemplate jmsMessagingTemplate;
  /**
  * 延时投递
  *
  * @return
  */
  //每隔3秒定投
  @Scheduled(fixedDelay = 3000)
  public void sendQueue() {
    jmsMessagingTemplate.convertAndSend(name, "消息ID:"  
UUID.randomUUID().toString().substring(0,6));
    System.out.println("消息发送成功...");
 }
}

死信队列

DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息

代码语言:javascript复制
出现以下情况时,消息会被重发:
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
当一个消息被重发超过6(缺省为6次)次数时,会给broker发送一个"Poison ack",这个消息被认为是a
poison pill,这时broker会将这个消息发送到死信队列,以便后续处理。
注意两点:
1)缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ
2)缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。
可以通过配置文件(activemq.xml)来调整死信发送策略。
  1. 修改activemq.xml 为每个队列建立独立的死信队列
代码语言:javascript复制
<destinationPolicy>
      <policyMap>
       <policyEntries>
         <policyEntry queue=">">
          <deadLetterStrategy>
            <individualDeadLetterStrategy queuePrefix="DLQ."
             useQueueForQueueMessages="true" />
          </deadLetterStrategy>
         </policyEntry>
   
         <policyEntry topic=">" >        
          <pendingMessageLimitStrategy>
           <constantPendingMessageLimitStrategy limit="1000"/>
          </pendingMessageLimitStrategy>
         </policyEntry>
       </policyEntries>
      </policyMap>
</destinationPolicy>
  1. RedeliveryPolicy重发策略设置 修改配置类
代码语言:javascript复制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
  //RedeliveryPolicy重发策略设置
  @Bean
  public RedeliveryPolicy redeliveryPolicy(){
    RedeliveryPolicy  redeliveryPolicy=  new RedeliveryPolicy();
    //是否在每次尝试重新发送失败后,增长这个等待时间
    redeliveryPolicy.setUseExponentialBackOff(true);
    //重发次数,默认为6次  这里设置为10次
    redeliveryPolicy.setMaximumRedeliveries(10);
    //重发时间间隔,默认为1秒
    redeliveryPolicy.setInitialRedeliveryDelay(1);
    //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
    redeliveryPolicy.setBackOffMultiplier(2);
    //是否避免消息碰撞
    redeliveryPolicy.setUseCollisionAvoidance(false);
    //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生
效
    redeliveryPolicy.setMaximumRedeliveryDelay(-1);
    return redeliveryPolicy;
 }
  @Bean
  public ActiveMQConnectionFactory activeMQConnectionFactory
(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy
redeliveryPolicy){
    ActiveMQConnectionFactory activeMQConnectionFactory =
        new ActiveMQConnectionFactory(
            "admin",
            "admin",
            url);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    return activeMQConnectionFactory;
 }
  @Bean
  public PlatformTransactionManager transactionManager(ConnectionFactory
connectionFactory) {
    return new JmsTransactionManager(connectionFactory);
 }
@Bean(name="jmsQueryListenerFactory")
  public DefaultJmsListenerContainerFactory 
jmsListenerContainerFactory(ConnectionFactory
connectionFactory,PlatformTransactionManager transactionManager){
    DefaultJmsListenerContainerFactory  factory=new
DefaultJmsListenerContainerFactory ();
    factory.setTransactionManager(transactionManager);
    factory.setConnectionFactory(connectionFactory);
    factory.setSessionTransacted(true); // 开启事务
    factory.setSessionAcknowledgeMode(1);
    return factory;
 }
}

ActiveMQ企业面试经典问题

问题:ActiveMQ宕机了怎么办?

1)ActiveMQ主从集群方案:Zookeeper集群 Replicated LevelDB ActiveMQ集群 官网链接: http://activemq.apache.org/replicated-leveldb-store 2)集群信息概览

3)先搭建Zookeeper集群

代码语言:javascript复制
1)上传zookeeper-3.4.6.tar.gz到linux
2)解压:tar -xzf zookeeper-3.4.6.tar.gz
3)创建根目录: mkdir /root/zookeeper
4)创建节点目录及数据,日志存放目录:
mkdir -p zookeeper/218{1,2,3}/{data,datalogs}
3个节点的子文件夹为:2181,2182,2183
5)复制Zookeeper到每个节点目录下
cp -r zookeeper-3.4.6 zookeeper/2181
cp -r zookeeper-3.4.6 zookeeper/2182
cp -r zookeeper-3.4.6 zookeeper/2183
6)移除原始目录
rm -rf zookeeper-3.4.14/
7)修改2181节点的zoo.cfg
cd zookeeper/2181/zookeeper-3.4.6/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
内容如下:
clientPort=2181
dataDir=/root/zookeeper/2181/data
dataLogDir=/root/zookeeper/2181/datalogs
server.1=192.168.66.133:2881:3881
server.2=192.168.66.133:2882:3882
server.3=192.168.66.133:2883:3883
8)相同方式修改2182及2183节点的zoo.cfg
2182配置为:
clientPort=2182
dataDir=/root/zookeeper/2182/data
dataLogDir=/root/zookeeper/2182/datalogs
server.1=192.168.66.133:2881:3881
server.2=192.168.66.133:2882:3882
server.3=192.168.66.133:2883:3883
2183配置为:
clientPort=2183
dataDir=/root/zookeeper/2183/data
dataLogDir=/root/zookeeper/2183/datalogs
server.1=192.168.66.133:2881:3881
server.2=192.168.66.133:2882:3882
server.3=192.168.66.133:2883:3883	
9)每个节点必须有myid配置文件,记录节点的唯一标识,必须放在dataDir文件夹下。而且id值必须与上面
配置的server.x中的x对应
touch 2181/data/myid && echo "1" > 2181/data/myid
touch 2182/data/myid && echo "2" > 2182/data/myid
touch 2183/data/myid && echo "3" > 2183/data/myid
查看是否创建成功:
more 218*/data/myid
10)分别启动三台Zookeeper
启动:
cd 2181/ && zookeeper-3.4.6/bin/zkServer.sh start
cd 2182/ && zookeeper-3.4.6/bin/zkServer.sh start
cd 2183/ && zookeeper-3.4.6/bin/zkServer.sh start
查看状态:
2181/zookeeper-3.4.6/bin/zkServer.sh status
2182/zookeeper-3.4.6/bin/zkServer.sh status
2183/zookeeper-3.4.6/bin/zkServer.sh status
看到Mode: leader的Zookeeper为主节点,其他为从节点。

4)搭建ActiveMQ集群

代码语言:javascript复制
1)上传apache-activemq-5.15.9-bin.tar.gz到linux
2)解压:tar -xzf apache-activemq-5.15.9-bin.tar.gz
3)创建三个节点目录
mkdir activemq
mkdir -p activemq/816{1,2,3}
4)复制activemq到每个节点目录
cp -r apache-activemq-5.15.9 activemq/8161
cp -r apache-activemq-5.15.9 activemq/8162
cp -r apache-activemq-5.15.9 activemq/8163
5)修改每个节点的activemq.xml
必须使用相同的集群名称
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="itheima_mq"
dataDirectory="${activemq.data}">
添加配置:
61616:
<persistenceAdapter>
<replicatedLevelDBdirectory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61616"
zkAddress="192.168.66.133:2181,192.168.66.133:2182,192.168.66.133:2183"
hostname="192.168.66.133"
zkPath="/activemq/leveldb-stores" />
</persistenceAdapter>
61617:
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61617"
zkAddress="192.168.66.133:2181,192.168.66.133:2182,192.168.66.133:2183"
hostname="192.168.66.133"
zkPath="/activemq/leveldb-stores" />
</persistenceAdapter>
61618:
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61618"
zkAddress="192.168.66.133:2181,192.168.66.133:2182,192.168.66.133:2183"
hostname="192.168.66.133"
zkPath="/activemq/leveldb-stores" />
</persistenceAdapter>
6)修改jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-
method="start">
      <!-- the default port number for the web console -->
   <property name="host" value="0.0.0.0"/>
   <property name="port" value="8161"/>
</bean>
分别为8181,8182,8183
7)分别启动每台activemq
可以使用ZooInspector工具查看ActiveMQ是否注册成功

5)生产者和消费者的broker-url需要修改

代码语言:javascript复制
server:
port: 9001
spring:
activemq:
 broker-url: failover:
(tcp://192.168.66.133:61616,tcp://192.168.66.133:61617,tcp://192.168.66.133:6161
68)
 user: admin
 password: admin

问题:如何防止消费方消息重复消费?

解决消费方幂等性问题! 如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试。在重试过程中造成重复 消费的问题。 解决思路:

  1. 如果消费方是做数据库操作,那么可以把消息的ID作为表的唯一主键,这样在重试的情 况下,会触发主键冲突,从而避免数据出现脏数据。
  2. 如果消费方不是做数据库操作,那么可以借助第三方的应用,例如Redis,来记录消费记录。每次消 息被消费完成时候,把当前消息的ID作为key存入redis,每次消费前,先到redis查询有没有该消息的消 费记录。

问题:如何防止消息丢失?

以下手段可以防止消息丢失:

  1. 在消息生产者和消费者使用事务
  2. 在消费方采用手动消息确认(ACK)
  3. 消息持久化,例如JDBC或日志

0 人点赞