springboot中rabbitmq的配置和使用【进阶一】

2019-07-25 14:50:25 浏览数 (1)

1、yml配置

代码语言:javascript复制
 alimq:
    ProducerId: PRODUCER(mq中定义)
    ConsumerId: CONSUMER(mq中定义)
    AccessKey:  
    SecretKey: 
    ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
    SendMsgTimeoutMillis: 3000
    topic: TOPIC
    #mq开关 0-不启动消费  1-启动消费
    mqflag: 1
    tag: ZC_xxx(mq中定义)

2、ali生产者和消费者配置

代码语言:javascript复制
package common.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @Description:
 * @Auther: liuyue
 * @Date: 
 */
@Configuration
@Data
public class AliMQConfig {
    @Value("${alimq.topic}")
    private String topic;
    @Value("${alimq.ProducerId}")
    private String producerId;
    @Value("${alimq.ConsumerId}")
    private String consumerId;
    @Value("${alimq.AccessKey}")
    private String accesskey;
    @Value("${alimq.SecretKey}")
    private String secretkey;
    @Value("${alimq.ONSAddr}")
    private String onsaddr;
    @Value("${alimq.tag}")
    private String subExpression;

    //提供消费者的配置
    public Properties getConsumerProperties() {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return consumerProperties;
    }

    //提供生产者的配置
    public Properties getProducerProperties() {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId);
        producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return producerProperties;
    }
}

3、消费者监听器

代码语言:javascript复制
package common.config;

import config.alimq.MQMsgConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Description:
 * @Auther: liuyue
 * @Date: 2018/11/14 19:31
 */
@Component
@Slf4j
public class ListenerConfig implements CommandLineRunner {
    @Resource
    MQMsgConsumer mqConsumer;
    @Value("${alimq.mqflag}")
    private String mqflag;

    @Override
    public void run(String... strings) throws Exception {
        if("0".equals(mqflag)){
            log.info("alimq没有开启消费");
        }else{
            log.info("=======alimq开始消费=========");
            mqConsumer.start();
            mqConsumer.onMessage();
        }
    }
}

4、消费者类

代码语言:javascript复制
package config.alimq;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import config.SpringContextHolder;
import config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@SuppressWarnings("all")
@Slf4j
@Component
public class MQMsgConsumer implements InitializingBean, DisposableBean {

    @Autowired
    AliMQConfig busMqConfig;
    private Consumer busConsumer;

    @Autowired
    IPanoramaProService panoramaProServiceImpl;

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("消费者初始化");
        busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());
        // busConsumer.start();
        log.info("消费者初始化完成");
    }

    public void start() {
        busConsumer.start();
    }

    public void onMessage() {
        busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                // System.out.println(JSON.toJSONString(message));
                System.out.println("Receive: "   message);
                System.out.println(new String(message.getBody()));
                Action consumer = consumer(message, context);
                return consumer;
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        busConsumer.shutdown();
        log.info("消费停止");
    }

    //执行mq消费
    public Action consumer(Message message, ConsumeContext context) {
        //更新审核时间 
        if("ZC_xxx".equals(message.getTag()))
        {
            boolean status = synchroProjectPlanStatus(message, context);
            if (!status) {
                return Action.CommitMessage;
            }
        }
        return Action.ReconsumeLater;
    }

    /**
     * 更新审核时间 
     * @author liu
     * @since 2018年11月2日 下午2:10:34
     * @param message
     * @param context
     * @return
     */
    private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){
        boolean bl = false;
        byte[] msgBody = message.getBody();
        if( null != msgBody && msgBody.length > 0 ){
            try {
                String msgBodyStr = new String(msgBody, "UTF-8");
                log.info(" THE MQ message body value: "   msgBodyStr);
                //JSONObject msgJson = JSONObject.parseObject(msgBodyStr);
                
                if( null != msgBodyStr ){
                    //转化为对象
                    ProjectPlanParas projectPlanParas =
                            JSON.parseObject(msgBodyStr, ProjectPlanParas.class);
                    log.info(" THE ProjectPlanParas   value: "   projectPlanParas.getZutuanCode());
                    log.info(" THE ProjectPlanParas   value: "   projectPlanParas.getFinishDate());
                    //执行更新的操作
                    bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas);
                    log.info(" THE MQ synchroProjectPlanStatus status : "   bl);
                }
            } catch (UnsupportedEncodingException e) {
                log.info(" THE MQ message UnsupportedEncodingException : "   e);
                e.printStackTrace();
            }
            
        }
        return bl;
    }
    
}

5、生产者类

代码语言:javascript复制
package config.alimq;

import com.aliyun.openservices.ons.api.*;
import com.config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
@Slf4j
public class MQMsgProducer implements InitializingBean, DisposableBean {

    @Autowired
    AliMQConfig busMqConfig;
    private Producer producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("生产者初始化");
        producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());
        producer.start();
    }

    public void sentMessage(Message message) {
        producer.sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(sendResult.getTopic()   "-----"   sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                log.error(context.getTopic()   "-----"   context.getMessageId()   ":error="   context.getException());
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        producer.shutdown();
    }
}

6、生产者调用类,推送消息,业务代码片段

代码语言:javascript复制
@Resource
    MQMsgProducer mqProducer;
    //修改成,使用alimq更新年景计划的时间 edit by liuy at 20181102日
     Message msg = new Message(aliMQConfig.getTopic(),
                "ZC_xxx", json.getBytes("UTF-8"));
     mqProducer.sentMessage(msg);

至此,全部过程结束。

0 人点赞