Spring Boot整合ActiveMQ

2020-04-03 17:55:45 浏览数 (1)

很多时候,多个系统间要相互集成,那么就避免不了系统的信息交互,例如:我们在上网的过程中需要使用到手机的短信验证码来注册账户等操作,对于一类敏感操作,往往也需要使用到短信业务,对于这一类系统而言,短信系统往往都是通过消息服务集成到主系统中的,当用户使用到短信业务的时候,主系统发送异步消息到短信系统中,通知短信系统完成短信发送的操作。有时候由于业务繁忙,短信系统没有立刻将短信发送出去,往往都是延迟几秒钟后才到达客户手机中。这个场景中用到的异步消息服务,也是本文介绍的主要内容,本文将介绍Spring Boot整合基于JMS服务规范的ActiveMQ,实现异步消息服务。

前期准备

在本地或者虚拟机中安装ActiveMQ并启动ActiveMQ服务,启动完毕之后通过浏览器来访问ActiveMQ的管理页面http://localhost:8161(这是本地安装的ActiveMQ,如果是在虚拟机中,localhost需要替换成虚拟机的IP)来确认一下服务是否正常启动,页面正常显示说明启动成功。 为了让你的Spring Boot应用支持ActiveMQ,需要在pom.xml文件中添加如下依赖:

代码语言:javascript复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- ActiveMQ连接池 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

由于添加了ActiveMQstarter依赖,那么我们只需要在配置文件中配置一些必需属性,那么Spring Boot在初始化Spring bean的时候就会自动装配操作ActiveMQ的相关的类,并存储Spring的上下文环境中,需要的时候直接注入即可使用。对于ActiveMQ而言,常见的配置如下所示:

代码语言:javascript复制
# activemq config
spring.activemq.broker-url=tcp://192.168.171.133:61616
spring.activemq.user=admin
spring.activemq.password=admin
# 是否使用发布订阅模式,默认为false,即默认使用的是点对点模式
spring.jms.pub-sub-domain=true
# 默认目的地址
spring.jms.template.default-destination=activemq.default.destination
# 是否启用连接池
spring.activemq.pool.enabled=true
# 连接池最大连接数配置
spring.activemq.pool.max-connections=50

根据上面的配置,Spring Boot会自动装配许多与ActiveMQ相关的对象,比如JMS连接工厂,连接池以及JmsTemplate对象等等。对于Spring Boot而言,它的自动装配让开发者节约了不少的时间,正常使用到的常用功能,JmsTemplate基本满足日常开发的需求。上面的配置中,我自定义了一个默认的目的地地址,也就是说消息发送的默认地址,以及消息消费者默认监听的地址,它将在后面的代码中直接利用到。本文主要讲解的是ActiveMQ的发布订阅模式,而不是点对点模式。

开发一个ActiveMQ的服务接口

ActiveMQ作为一个消息中间件,往往承载的是两个不同项目之间的消息传递,也就是说消息生产者和消息消费者往往存在于不同的项目中,本文为了演示简便,提供了一个接口,这个接口既可以发送消息,又可以消费消息。

代码语言:javascript复制
package cn.itlemon.springboot.activemq.service;

/**
 * @author jiangpingping
 * @date 2018/10/30 21:55
 */
public interface ActiveMqService {

    /**
     * 发送消息
     *
     * @param message 消息体
     */
    void sendMsg(String message);

    /**
     * 接收消息
     *
     * @param message 消息体
     */
    void receiveMsg(String message);

}

接口中,第一个方法是用来发送消息,第二个方法用来接收消息。它的实现类是:

代码语言:javascript复制
package cn.itlemon.springboot.activemq.service.impl;

import cn.itlemon.springboot.activemq.service.ActiveMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

/**
 * 这里仅仅是案例,发送消息和接收消息在同一个类中,实际开发过程中都是分布在不同的项目中
 *
 * @author jiangpingping
 * @date 2018/10/30 21:56
 */
@Service
@Slf4j
public class ActiveMqServiceImpl implements ActiveMqService {

    private final JmsTemplate jmsTemplate;

    @Autowired
    public ActiveMqServiceImpl(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Override
    public void sendMsg(String message) {
        log.info("开始发送消息:"   message);
        // 这里在配置文件中定义了默认地址,所以这里无需再次指定地址
        jmsTemplate.convertAndSend(message);
    }

    @JmsListener(destination = "${spring.jms.template.default-destination}")
    @Override
    public void receiveMsg(String message) {
        log.info("开始接收消息:"   message);
    }
}

上述的实现类中,sendMsg方法一旦被调用,那么jmsTemplate就会发送一条消息到默认的目的地地址,也就是上面的配置文件中的spring.jms.template.default-destination的值,而receiveMsg方法上面添加了一个@JmsListener注解,这个注解的属性destination就指向了spring.jms.template.default-destination的值,也就是消息发送的地址。对于消息的发送,调用的是jmsTemplateconvertAndSend方法,这个方法涉及到两点,一个是转换,另一个是发送,对于转换,默认情况下,jmsTemplate会提供一个SimpleMessageConverter去提供转换规则,它是一个简单的文本消息转换器,能很方便地处理文本消息,根据项目需要,还可以使用序列化消息转换器(SerializerMessageConverter)或者Json消息转换器(Jackson2JsonMessageConverter)以及其他实现MessageConverter的转换器等。

验证简单消息发送

这里写一个Controller,检验一下从前端传递过来的数据的发送与接收情况。

代码语言:javascript复制
package cn.itlemon.springboot.activemq.controller;

import cn.itlemon.springboot.activemq.pojo.User;
import cn.itlemon.springboot.activemq.service.ActiveMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author jiangpingping
 * @date 2018/10/30 22:02
 */
@RestController
@RequestMapping("/activemq")
public class ActiveMqController {

    private final ActiveMqService activeMqService;

    @Autowired
    public ActiveMqController(ActiveMqService activeMqService) {
        this.activeMqService = activeMqService;
    }

    @GetMapping("/{message}")
    public String send(@PathVariable String message) {
        activeMqService.sendMsg(message);
        return "SUCCESS";
    }

}

启动Spring Boot应用,在浏览器地址输入:http://localhost:8080/activemq/test,观察日志,可以看到,在日志中记录了消息的发送与接收。

代码语言:javascript复制
2018-11-01 20:30:20.854  INFO 18116 --- [nio-8080-exec-1] c.i.s.a.s.impl.ActiveMqServiceImpl       : 开始发送消息:test
2018-11-01 20:30:20.917  INFO 18116 --- [enerContainer-1] c.i.s.a.s.impl.ActiveMqServiceImpl       : 开始接收消息:test

当然,我们也可以观察ActiveMQ的管理页面的相关变化,进入它的管理界面,我们看到如下图所示的内容:

Name一列的activemq.default.destination正是我们之前在application.properties中配置的spring.jms.template.default-destination的值。

验证复杂类型消息发送

前面的案例发送的是一个字符串消息,对于复杂类型的对象,如何进行发送呢?这里我们自定义一个POJO类,然后尝试使用jmsTemplate进行发送。

代码语言:javascript复制
package cn.itlemon.springboot.activemq.pojo;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;

/**
 * @author jiangpingping
 * @date 2018/10/30 22:29
 */
@Getter
@Setter
@Builder
@ToString
public class User implements Serializable {

    private Long id;

    private String username;

    private String note;

}

这个POJO实例对象是要作为消息进行发送的,所以它必须能够进行序列化,也即是实现Serializable接口。这里我们再单独创建一个ActiveMQ的服务接口以及它的实现类。

代码语言:javascript复制
package cn.itlemon.springboot.activemq.service;

import cn.itlemon.springboot.activemq.pojo.User;

/**
 * @author jiangpingping
 * @date 2018/10/30 22:32
 */
public interface ActiveMqUserService {

    /**
     * 发送user对象
     *
     * @param user 对象
     */
    void sendUser(User user);

    /**
     * 接收user对象
     *
     * @param user 对象
     */
    void receiveUser(User user);

}

实现类:

代码语言:javascript复制
package cn.itlemon.springboot.activemq.service.impl;

import cn.itlemon.springboot.activemq.pojo.User;
import cn.itlemon.springboot.activemq.service.ActiveMqUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

/**
 * @author jiangpingping
 * @date 2018/10/30 22:33
 */
@Service
@Slf4j
public class ActiveMqUserServiceImpl implements ActiveMqUserService {

    private final JmsTemplate jmsTemplate;

    private static final String DESTINATION = "my-destination";

    @Autowired
    public ActiveMqUserServiceImpl(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Override
    public void sendUser(User user) {
        log.info("发送user对象:"   user.toString());
        jmsTemplate.convertAndSend(DESTINATION, user);
    }

    @JmsListener(destination = DESTINATION)
    @Override
    public void receiveUser(User user) {
        log.info("接收user对象:"   user.toString());
    }
}

这里我们重新自定义了一个消息发送的目的地地址,而没有使用原来的地址,原因是原来的消费者只能消费字符串消息,而这个User类型的消息,它并不能消费。我们再在Controller中添加一个方法,用来验证User类型的消息发送与消费。

代码语言:javascript复制
@Autowired
private ActiveMqUserService activeMqUserService;

@GetMapping("/user/{id}/{username}/{note}")
public User send(@PathVariable("id") Long id, @PathVariable("username") String username, @PathVariable("note") String note) {
    User user = User.builder().id(id).username(username).note(note).build();
    activeMqUserService.sendUser(user);
    return user;
}

再次启动Spring Boot应用,在浏览器地址栏输入http://localhost:8080/activemq/user/1/lemon/test,浏览器接收到了正常的返回结果,也就是User的实体类对象转换后的json数据,但是观察控制台,却发现发生了异常,原因是消息消费者抛出了一个异常。

代码语言:javascript复制
Caused by: java.lang.ClassNotFoundException: Forbidden class cn.itlemon.springboot.activemq.pojo.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112) ~[activemq-client-5.15.6.jar:5.15.6]
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57) ~[activemq-client-5.15.6.jar:5.15.6]
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866) ~[na:1.8.0_171]
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749) ~[na:1.8.0_171]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040) ~[na:1.8.0_171]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[na:1.8.0_171]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[na:1.8.0_171]
	at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211) ~[activemq-client-5.15.6.jar:5.15.6]
	... 21 common frames omitted

发生异常最终原因是:类cn.itlemon.springboot.activemq.pojo.User没有被信任,所以不允许作为ObjectMessage进行发送。查看后面的地址:http://activemq.apache.org/objectmessage.html ,可以从中获取到解决办法,解决的办法是需要将发送的对象所在的包要配置到信任列表中,经过各方面查阅资料,被发送的实体类对象中包含其他的复杂类型属性,其他复杂类型属性也需要加入到信任列表中,所以这里需要将Long类型的包也要进入到信任列表中,所以在application.properties中加入以下配置:

代码语言:javascript复制
# 如果需要发送Object对象消息,那么就需要配置信任的包,包内的所有对象都可以发送
spring.activemq.packages.trusted=cn.itlemon.springboot.activemq.pojo,java.lang

如果需要加入所有的包到信任列表,可以将以上配置换成如下:

代码语言:javascript复制
spring.activemq.packages.trust-all=true

修改完之后,再次重启Spring Boot应用,查看日志:

代码语言:javascript复制
2018-11-01 22:59:23.111  INFO 19651 --- [nio-8080-exec-1] c.i.s.a.s.impl.ActiveMqUserServiceImpl   : 发送user对象:User(id=1, username=lemon, note=test)
2018-11-01 22:59:23.149  INFO 19651 --- [enerContainer-1] c.i.s.a.s.impl.ActiveMqUserServiceImpl   : 接收user对象:User(id=1, username=lemon, note=test)

结果显示,对象的发送和接收都已经成功了。

0 人点赞