基于SpringBoot通过注解实现对mqtt消息处理的异步调用
使用背景
生产环境下, 由于mqtt 生产者生产的消息逐渐增多, 可能会导致消息堆积. 因此需要消费者去快速的消费. 而其中的一个方案便是使用异步线程去加速消费消息. 下面介绍下思路
我们可以在原来的mqtt工具类上面进行改装. 首先创建一个类MqttMessageListener并继承IMqttMessageListener实现messageArrived, 用于处理这些消息(业务编写) 然后改写mqtt客户端订阅的方法, 注入MqttMessageListener, 并在订阅方法中新增该参数 在然后在启动类开启异步线程, 编写一个配置类配置线程池参数并且在messageArrived加上@Async开启异步线程调用
代码实现
基础代码
指没有开启线程池的代码
- MqttPushClient 主要定义了连接参数
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author
* @Date
* @Description 连接至EMQ X 服务器,获取mqtt连接,发布消息
*/
@Component
public class MqttPushClient{
private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public static MqttClient getClient() {
return client;
}
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
if (username != null) {
options.setUserName(username);
}
if (password != null) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
//设置回调类
client.setCallback(pushCallback);
//client.connect(options);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.info("MQTT连接" (complete?"成功":"失败"));
/** 订阅主题 **/
for (String topic : topicList) {
log.info("连接订阅主题:{}", topic);
client.subscribe(topic, 0);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- PushCallback 回调类, 实现重连, 消息发送监听, 消息接收监听
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author
* @Date
* @Description 消息回调,处理接收的消息
*/
@Component
public class PushCallback implements MqttCallback {
private static final Logger log = LoggerFactory.getLogger(PushCallback.class);
@Autowired
private MqttConfiguration mqttConfiguration;
@Autowired
private MqttTopic mqttTopic;
@Override
public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连
log.info("连接断开,正在重连");
MqttPushClient mqttPushClient = mqttConfiguration.getMqttPushClient();
if (null != mqttPushClient) {
mqttPushClient.connect(mqttConfiguration.getHost(), mqttConfiguration.getClientid(), mqttConfiguration.getUsername(),
mqttConfiguration.getPassword(), mqttConfiguration.getTimeout(), mqttConfiguration.getKeepalive(), mqttConfiguration.getTopic());
log.info("已重连");
}
}
/**
* 发送消息,消息到达后处理方法
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("消息发送完成,messageId={},topics={}",messageId,topics.toString());
}
/**
* 订阅主题接收到消息处理方法
* @param topic
* @param message
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面,这里在控制台有输出
String messageStr = new String(message.getPayload());
// messageDistribute.distribute(topic, messageStr);
log.info("接收的主题:" topic ";接收到的信息:" messageStr);
}
}
- MqttConfiguration 配置了mqtt相关参数, 并初始化连接(mqtt在这里启动)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author
* @Date mqtt配置及连接
* @Description
*/
@Slf4j
@Component
@Configuration
@ConfigurationProperties(MqttConfiguration.PREFIX)
public class MqttConfiguration {
@Autowired
private MqttPushClient mqttPushClient;
/**
* 指定配置文件application-local.properties中的属性名前缀
*/
public static final String PREFIX = "std.mqtt";
private String host;
private String clientId;
private String userName;
private String password;
private int timeout;
private int keepAlive;
private List<String> topic;
public String getClientid() {
return clientId;
}
public void setClientid(String clientid) {
this.clientId = clientid;
}
public String getUsername() {
return userName;
}
public void setUsername(String username) {
this.userName = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getKeepalive() {
return keepAlive;
}
public void setKeepalive(int keepalive) {
this.keepAlive = keepalive;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public List<String> getTopic() {
return topic;
}
public void setTopic(List<String> topic) {
this.topic = topic;
}
/**
* 连接至mqtt服务器,获取mqtt连接
* @return
*/
@Bean
public MqttPushClient getMqttPushClient() {
//连接至mqtt服务器,获取mqtt连接
mqttPushClient.connect(host, clientId, userName, password, timeout, keepAlive, topic);
return mqttPushClient;
}
}
properties.yml 配置文件
std.mqtt:
host: tcp://x.x.x.x:1883
username: your_username
password: your_password
#MQTT-连接服务器默认客户端ID
clientid: your_clientid
#连接超时
timeout: 1000
# deviceId
deviceId: your_deviceId
# mqtt-topic
topic[0]: your_tpoic
- TopicOperation 定义了发布订阅的方法
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author chy
*/
public class TopicOperation {
private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);
/**
* 订阅主题
* @param topic 主题名称
*/
public static void subscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
client.subscribe(topic, 0);
log.info("订阅主题:{}",topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布主题
*
* @param topic
* @param pushMessage
*/
public static void publish(String topic, String pushMessage) {
log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(0);
// 非持久化
message.setRetained(false);
message.setPayload(pushMessage.getBytes());
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.error("主题不存在:{}",mTopic);
}
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("mqtt发送消息异常:",e);
}
}
}
- 定义了发布和订阅的相关主题
import com.sxd.onlinereservation.exception.BusinessException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @Author
* @Date topic名称
* @Description
*/
@Component
public class MqttTopic {
@Value("${std.mqtt.deviceId}")
private String[] deviceId;
public String getSubscribeTopic(String type){
switch (type){
case "appointTopic":
return String.format("/v1/%s/service/appointTopic", deviceId[0]);
default:
throw new BusinessException("mqtt 订阅主题获取错误");
}
}
public String getPublishTopic(String type) {
switch (type){
//1.0接口立即取号发布主题
case "appointTopic":
return String.format("/v1/%s/service/appointTopic", deviceId[1]);
default:
throw new BusinessException("mqtt 发布主题获取错误");
}
}
}
ps: 如果想要使用该工具类进行消息发送和接收看下面demo
代码语言:javascript复制//消息发布操作
TopicOperation.publish(mqttTopic.getPublishTopic("appointTopic"), "消息体"));
//消息订阅操作
TopicOperation.subscribe(mqttTopic.getSubscribeTopic("appointTopic"), "消息体"));
异步线程处理实现
总结
- 创建消息监听类 , 用于监听消息并进行业务处理
- 在原来订阅时, 注入并使用第一步创建的监听类
- 通过注解开启异步线程并配置处理方式
创建消息监听类 , 用于监听消息并进行业务处理
代码语言:javascript复制@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
private BusinessService businessService;
@Autowired
private MqttTopic mqttTopic;
@Autowired
private ThreeCallmachineService threeCallmachineService;
@Autowired
private BusinessHallService businessHallService;
@Autowired
private BusinessMaterialService businessMaterialService;
@Autowired
private BusinessWaitService businessWaitService;
@Autowired
private AppointmentService appointmentService;
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
log.info("接收的主题:" topic ";接收到的信息:" messageStr);
//进行 业务处理
}
}
在原来订阅时, 注入并使用第一步创建的监听类
注入了
MqttMessageListener
, 并且在订阅时加入client.subscribe(topic, mqttMessageListener);
- 修改MqttPushClient (必须)
@Component
public class MqttPushClient{
private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
@Autowired //这里进行了注入操作
private MqttMessageListener mqttMessageListener;
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
public static MqttClient getClient() {
return client;
}
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, List<String> topicList) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
if (username != null) {
options.setUserName(username);
}
if (password != null) {
options.setPassword(password.toCharArray());
}
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
//设置回调类
client.setCallback(pushCallback);
//client.connect(options);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.info("MQTT连接" (complete?"成功":"失败"));
/** 订阅主题 **/
for (String topic : topicList) {
log.info("连接订阅主题:{}", topic);
//client.subscribe(topic, 0);
client.subscribe(topic, mqttMessageListener);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 如果业务还使用了手动订阅, 则也需要在订阅的类上面注入MqttMessageListener , 并且在订阅方法中作为参数使用. 但是我们需要将方法改成非静态的, 因此在使用该方法时我们需要new该对象然后才能够调用. 但是手动订阅很少用到. 因此有无此步骤都可
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Author chy
* @Date
* @Description
*/
public class TopicOperation {
private static final Logger log = LoggerFactory.getLogger(TopicOperation.class);
//注入MqttMessageListener
@Autowired
private MqttMessageListener mqttMessageListener;
/**
* 订阅主题
* @param topic 主题名称
*/
public void subscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
//client.subscribe(topic, 0);
//在订阅方法中作为参数使用
client.subscribe(topic, mqttMessageListener);
log.info("订阅主题:{}",topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布主题
*
* @param topic
* @param pushMessage
*/
public static void publish(String topic, String pushMessage) {
log.info("SEND TO MQTT -- topic : {}, message : {}", topic, pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(0);
// 非持久化
message.setRetained(false);
message.setPayload(pushMessage.getBytes());
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
};
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.error("主题不存在:{}",mTopic);
}
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("mqtt发送消息异常:",e);
}
}
}
通过注解开启异步线程并配置处理方式
- 启动类开启
@EnableAsync(proxyTargetClass=true )
@SpringBootApplication
@MapperScan(basePackages = "com.x.x.mapper")
@EnableTransactionManagement
@EnableAsync(proxyTargetClass=true )
public class XXApplication {
public static void main(String[] args) {
SpringApplication.run(XXApplication.class, args);
}
}
- 配置类配置线程池参数
@Slf4j
@Configuration
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(9);
//配置最大线程数
executor.setMaxPoolSize(20);
//配置队列大小
executor.setQueueCapacity(200);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("sxd-async-service-");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
- MqttMessageListener的实现方法messageArrived开启
@Async("asyncServiceExecutor")
@Slf4j
@Component
public class MqttMessageListener implements IMqttMessageListener {
@Resource
private BusinessService businessService;
@Autowired
private MqttTopic mqttTopic;
@Autowired
private ThreeCallmachineService threeCallmachineService;
@Autowired
private BusinessHallService businessHallService;
@Autowired
private BusinessMaterialService businessMaterialService;
@Autowired
private BusinessWaitService businessWaitService;
@Autowired
private AppointmentService appointmentService;
@Override
@Async("asyncServiceExecutor")
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
log.info("接收的主题:" topic ";接收到的信息:" messageStr);
System.out.println("线程名称:【" Thread.currentThread().getName() "】");
//进行 业务处理
}
}