1、yml配置
代码语言:javascript复制 alimq:
ProducerId: PRODUCER(mq中定义)
ConsumerId: CONSUMER(mq中定义)
AccessKey:
SecretKey:
ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
SendMsgTimeoutMillis: 3000
topic: TOPIC
#mq开关 0-不启动消费 1-启动消费
mqflag: 1
tag: ZC_xxx(mq中定义)
2、ali生产者和消费者配置
代码语言:javascript复制package common.config;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* @Description:
* @Auther: liuyue
* @Date:
*/
@Configuration
@Data
public class AliMQConfig {
@Value("${alimq.topic}")
private String topic;
@Value("${alimq.ProducerId}")
private String producerId;
@Value("${alimq.ConsumerId}")
private String consumerId;
@Value("${alimq.AccessKey}")
private String accesskey;
@Value("${alimq.SecretKey}")
private String secretkey;
@Value("${alimq.ONSAddr}")
private String onsaddr;
@Value("${alimq.tag}")
private String subExpression;
//提供消费者的配置
public Properties getConsumerProperties() {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId);
consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
return consumerProperties;
}
//提供生产者的配置
public Properties getProducerProperties() {
Properties producerProperties = new Properties();
producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId);
producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
return producerProperties;
}
}
3、消费者监听器
代码语言:javascript复制package common.config;
import config.alimq.MQMsgConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description:
* @Auther: liuyue
* @Date: 2018/11/14 19:31
*/
@Component
@Slf4j
public class ListenerConfig implements CommandLineRunner {
@Resource
MQMsgConsumer mqConsumer;
@Value("${alimq.mqflag}")
private String mqflag;
@Override
public void run(String... strings) throws Exception {
if("0".equals(mqflag)){
log.info("alimq没有开启消费");
}else{
log.info("=======alimq开始消费=========");
mqConsumer.start();
mqConsumer.onMessage();
}
}
}
4、消费者类
代码语言:javascript复制package config.alimq;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import config.SpringContextHolder;
import config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@SuppressWarnings("all")
@Slf4j
@Component
public class MQMsgConsumer implements InitializingBean, DisposableBean {
@Autowired
AliMQConfig busMqConfig;
private Consumer busConsumer;
@Autowired
IPanoramaProService panoramaProServiceImpl;
@Override
public void afterPropertiesSet() throws Exception {
log.info("消费者初始化");
busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());
// busConsumer.start();
log.info("消费者初始化完成");
}
public void start() {
busConsumer.start();
}
public void onMessage() {
busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
// System.out.println(JSON.toJSONString(message));
System.out.println("Receive: " message);
System.out.println(new String(message.getBody()));
Action consumer = consumer(message, context);
return consumer;
}
});
}
@Override
public void destroy() throws Exception {
busConsumer.shutdown();
log.info("消费停止");
}
//执行mq消费
public Action consumer(Message message, ConsumeContext context) {
//更新审核时间
if("ZC_xxx".equals(message.getTag()))
{
boolean status = synchroProjectPlanStatus(message, context);
if (!status) {
return Action.CommitMessage;
}
}
return Action.ReconsumeLater;
}
/**
* 更新审核时间
* @author liu
* @since 2018年11月2日 下午2:10:34
* @param message
* @param context
* @return
*/
private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){
boolean bl = false;
byte[] msgBody = message.getBody();
if( null != msgBody && msgBody.length > 0 ){
try {
String msgBodyStr = new String(msgBody, "UTF-8");
log.info(" THE MQ message body value: " msgBodyStr);
//JSONObject msgJson = JSONObject.parseObject(msgBodyStr);
if( null != msgBodyStr ){
//转化为对象
ProjectPlanParas projectPlanParas =
JSON.parseObject(msgBodyStr, ProjectPlanParas.class);
log.info(" THE ProjectPlanParas value: " projectPlanParas.getZutuanCode());
log.info(" THE ProjectPlanParas value: " projectPlanParas.getFinishDate());
//执行更新的操作
bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas);
log.info(" THE MQ synchroProjectPlanStatus status : " bl);
}
} catch (UnsupportedEncodingException e) {
log.info(" THE MQ message UnsupportedEncodingException : " e);
e.printStackTrace();
}
}
return bl;
}
}
5、生产者类
代码语言:javascript复制package config.alimq;
import com.aliyun.openservices.ons.api.*;
import com.config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
@Component
@Slf4j
public class MQMsgProducer implements InitializingBean, DisposableBean {
@Autowired
AliMQConfig busMqConfig;
private Producer producer;
@Override
public void afterPropertiesSet() throws Exception {
log.info("生产者初始化");
producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());
producer.start();
}
public void sentMessage(Message message) {
producer.sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(sendResult.getTopic() "-----" sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
log.error(context.getTopic() "-----" context.getMessageId() ":error=" context.getException());
}
});
}
@Override
public void destroy() throws Exception {
producer.shutdown();
}
}
6、生产者调用类,推送消息,业务代码片段
代码语言:javascript复制@Resource
MQMsgProducer mqProducer;
//修改成,使用alimq更新年景计划的时间 edit by liuy at 20181102日
Message msg = new Message(aliMQConfig.getTopic(),
"ZC_xxx", json.getBytes("UTF-8"));
mqProducer.sentMessage(msg);
至此,全部过程结束。