spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)

2021-03-23 14:47:31 浏览数 (1)

文代码:https://gitee.com/hong99/spring/issues/I1N1DF


注:由于本文比较长,建议还是下载源码学习。


中间件是什么?

中间件是介于应用系统和系统软件之间的一类软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。---百度百科

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

参考地址:https://baike.baidu.com/item/中间件/452240?fr=aladdin

中间件解决了什么问题?

使高度耦合的系统解耦;

瞬时高峰的削峰处理;

保证数据安全性和最终一致;

原来系统间的交互是通过JMS或http协议调用,在效率、安全、可靠上面都不是很高,而中间件可以让原有的复杂系统解耦,分为消息端、生产方,队列,并且还可以让数据保存到中间件持久下来,也解决了数据丢失问题,数据也可以异步化,在一些平台大触或高峰时期的时候可以起到削峰作用,避免同步产生的一系列问题。

中间件有哪些?

RocketMQ

Rocket Mqj是阿里巴巴开源,是一个分布式消息和流数据平台,具有低 延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件,2016年11月21日,阿里巴巴向Apache软件基金会捐赠了RocketMQ;第二年2月20日,Apache软件基金会宣布Apache RocketMQ成为顶级项目。

维基百科:https://zh.wikipedia.org/wiki/Apache_RocketMQ

官网:https://rocketmq.apache.org/

kafka

Kafka最初由领英开发开源,由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,[3]这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。---维基百科

官网:http://kafka.apache.org/

RabbitMQ

Rabbit科技有限公司开发了RabbitMQ,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

官网:https://www.rabbitmq.com/

ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

官网:http://activemq.apache.org/index.html

ZeroMQ

ZeroMQ(也称为ØMQ,0MQ或ZMQ)是一种高性能的异步消息传递库,旨在用于分布式或并发应用程序中。它提供了一个消息队列,但是与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。

官网:https://zeromq.org/

TubeMQ

apache TubeMQ是腾讯开源万亿级分布式消息中间件,专注于海量数据下的数据传输和存储,与许多开源MQ项目相比,TubeMQ在稳定性、性能和低成本方面具有独特的优势。

官网:https://tubemq.apache.org/zh-cn/

NSQ

NSQ 是无中心设计、节点自动注册和发现的开源消息系统。可作为内部通讯框架的基础,易于配置和发布。

官网:https://nsq.io/

....

消息中间件少说也有十几个多的话直的也太多了...不建议每个都深入了解,找一两个合适的就OK...

最后

代码实现

本文代码:https://gitee.com/hong99/spring/issues/I1N1DF

RocketMQ

基本概念

1 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2 消息生产者(Producer) 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

3 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

5 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过 名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

7 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

8 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

9 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

10 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

11 集群消费(Clustering)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

12 广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

13 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

14 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

15 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

16 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

特性(features)

订阅与发布:消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。

消息顺序:消息有序指的是一类消息消费时,能按照发送的顺序来消费。

消息过滤:RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。

消息可靠性:RocketMQ支持消息的高可靠。

至少一次:至少一次(At least Once)指每个消息必须投递一次。

回溯消费:回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。

事务消息:RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

定时消息:定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。

消息重试:Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

消息重投:生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。

流量控制:生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

死信队列:死信队列用于处理无法被正常消费的消息。

架构设计

NameServer 是注册中心,类似于zk,这个是rocketmq自研的注册中心;

Broker是节点服务器,类似于主机

producer是生产者;

Consumer是消费者;

流程:Broker每次启动的时候都会向NameServer注册,而NameServer每隔30秒会自动检测Broker是否存活若Broker死亡则从NameServer中移除,

详细可以参考:https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md

基于windows 10安装rocketmq

环境要求:

64位 windows10;

64位JDK 1.8 ;

Maven 3.2.x;

Git;

足够的4G磁盘空间

下载reocketmq:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

配置环境变量

代码语言:javascript复制
ROCKETMQ_HOME="你的目标"
NAMESRV_ADDR="localhost:9876"

运行

遇到问题

代码语言:javascript复制
Please set the ROCKETMQ_HOME variable in your environment!

解决方法

powershell配置环境变量

代码语言:javascript复制
$Env:ROCKETMQ_HOME="你的地址"
$Env:NAMESRV_ADDR="localhost:9876"

如下:

发现The Name Server boot success. 就证明配置是OK的。

启动broker

代码语言:javascript复制
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:启动 mqbroker.cmd会报找不到主类,需要修改 mqbroker下的%CLASSPATH%添加双引号~

部署管理后台

下载代码:https://github.com/apache/rocketmq-externals

解压后进入:..rocketmq-externals-masterrocketmq-consolesrcmainresources 打开application.properties

rocketmq-externals-masterrocketmq-console 修改pom.xml 版本

进入rocketmq-externals-masterrocketmq-console根目录进行打包:

代码语言:javascript复制
mvn clean package -Dmaven.test.skip=true

进入生成的target目录进得运下命令如下:

代码语言:javascript复制
java -jar rocketmq-console-ng-2.0.0.jar

然后访问管理台 127.0.0.1:端口 ,这里我改的是8081所以就是127.0.0.1:8081

java实现rocketmq

搭好单节点的rocketmq和管理平台,接下来通过maven纯java进行学习测试。

相关参数:https://github.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md

官方案例https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

目录结构

代码语言:javascript复制
│ pom.xml
│
└─src
    ├─main
    │ ├─java
    │ │ └─com
    │ │ └─hong
    │ │ └─rocketmq
    │ │ ├─consumer
    │ │ │ Consumer.java
    │ │ │ ScheduledMessageConsumer.java
    │ │ │
    │ │ └─producer
    │ │ AsyncProducer.java
    │ │ BroadcastProducer.java
    │ │ OnewayProducer.java
    │ │ ScheduledMessageProducer.java
    │ │ SyncProducer.java
    │ │
    │ └─resources
    └─test
        └─java

spring_mq/java_rocketmq/pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>java_rocketmq</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
    </dependencies>

</project>

com.hong.rocketmq.producer.SyncProducer

代码语言:javascript复制
package com.hong.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author: csh
 * @Date: 2021/3/12 09:50
 * @Description:同步发送消息
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i  ) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ 我是同步发送 "   i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

com.hong.rocketmq.consumer.Consumer

代码语言:javascript复制
package com.hong.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/**
 *
 * 功能描述: 消费端
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/3/12 11:02
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);


                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以把消息打开放到那里,然后再运行生产端,结果如下:

生产发送数据

接收端(consumer)

到你的本地ip:8081 中Message 查看如下:

写了几个java的这里不一一贴出来,建议下载代码学习。

遇到过的问题

代码语言:javascript复制
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 1.00 CQ: 1.00 INDEX: 1.00], messages are put to the slave, message store has been shut down, etc. BROKER: 10.3.6.59:10911

解决

将 c盘用户下面的store下面全部干掉!

代码语言:javascript复制
C:Users你的用户store

spring整合rocketmq

生产者 通过controller添加用户,然后发送给消费者进行添加。

生产者

项目结构

代码语言:javascript复制
│ pom.xml
│ spring_rocketmq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Producer.java
│ │ │ │ TopicAll.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

com.hong.spring.config.Producer

代码语言:javascript复制
package com.hong.spring.config;

import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

/**
 * @author: csh
 * @Date: 2021/3/16 09:43
 * @Description:rocketmq生产者
 */
@Log4j2
public class Producer{
    //生产实例
    private DefaultMQProducer defaultMQProducer;
    //生产组
    private String producerGroup;
    //地址
    private String namesrvAddr;

    /**
     * 初始化
     */
    public void init() throws MQClientException {
        // 参数信息
        log.info("DefaultMQProducer 初始化");
        log.info(producerGroup);
        log.info(namesrvAddr);
        // 初始化
        defaultMQProducer = new DefaultMQProducer(producerGroup);
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        defaultMQProducer.start();
        log.info("rocketmq start success!");
    }


    public void destroy() {
        defaultMQProducer.shutdown();
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

}

com.hong.spring.config.TopicAll

代码语言:javascript复制
package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String HONG_TOPIC="user_topic";
}

com.hong.spring.controller.ao.UserSaveAO

代码语言:javascript复制
package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String HONG_TOPIC="user_topic";
}

com.hong.spring.controller.UserController

代码语言:javascript复制
package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.Producer;
import com.hong.spring.config.TopicAll;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private Producer hongProducer;


    @RequestMapping("save")
    public DataResponse<Boolean> save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           User user =new User();
           BeanUtils.copyProperties(ao,user);
           //在正式的时候 可以将通tags再正式区分不同的业务 比如:更新 、新增 等
           Message message = new Message(TopicAll.HONG_TOPIC,"saveTags", RemotingSerializable.encode(user));
           //发送同步消息
           log.info("最终发出去的消息{}", JSONObject.toJSONString(message));
           SendResult send = hongProducer.getDefaultMQProducer().send(message);
           if(null==send || send.getSendStatus()!= SendStatus.SEND_OK){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}

application.properties

代码语言:javascript复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:rocketmq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />

   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>
</beans>

log4j2.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

rocketmq.properties

代码语言:javascript复制
rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876

rocketmq.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

   <bean id="hongProducer"  name="hongProducer" class="com.hong.spring.config.Producer" init-method="init" destroy-method="destroy">
      <property name="producerGroup" value="${rocketmq.producerGroup}" />
      <property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
   </bean>
</beans>

spring_mq/spring_rocketmq_producer/pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_rocketmq_producer</artifactId>



    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.8.0</version>
            <type>pom</type>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-core</artifactId>-->
            <!--<version>2.9.0</version>-->
        <!--</dependency>-->
        <!--<dependency>-->
            <!--<groupId>org.apache.logging.log4j</groupId>-->
            <!--<artifactId>log4j-api</artifactId>-->
            <!--<version>2.5</version>-->
        <!--</dependency>-->
    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>
</project>

web.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_rocketmq_producer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:rocketmq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_rocketmq_producer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>
代码语言:javascript复制
[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Artifact is deployed successfully
[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Deploy took 10,004 milliseconds
15:22:17.505 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1000,"username":"hong"}
15:22:17.518 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 最终发出去的消息{"body":"eyJhZ2UiOjEwMDAsInVzZXJuYW1lIjoiaG9uZyJ9","delayTimeLevel":0,"flag":0,"properties":{"WAIT":"true","TAGS":"saveTags"},"tags":"saveTags","topic":"user_topic","waitStoreMsgOK":true}

消息状态

消息者

代码语言:javascript复制
│ pom.xml
│ spring_rocketmq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Consumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml

com.hong.spring.config.Consumer

代码语言:javascript复制
package com.hong.spring.config;

import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

/**
 * @author: csh
 * @Date: 2021/3/16 09:43
 * @Description:rocketmq生产者
 */
@Log4j2
public class Consumer {
    //生产实例
    private DefaultMQPushConsumer defaultMQConsumer;
    //生产组
    private String producerGroup;
    //地址
    private String namesrvAddr;
    //topic
    private String topic;
    //tag
    private String tag="*";
    //监听
    private MessageListener messageListener;

    /**
     * 初始化
     */
    public void init() throws MQClientException {
        // 参数信息
        log.info("DefaultMQProducer 初始化");
        log.info(producerGroup);
        log.info(namesrvAddr);
        // 初始化
        defaultMQConsumer = new DefaultMQPushConsumer(producerGroup);
        defaultMQConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        defaultMQConsumer.subscribe(topic,tag);
        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        defaultMQConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 设置为集群消费(区别于广播消费) CLUSTERING:集群消费 BROADCASTING:广播消费
        defaultMQConsumer.setMessageModel(MessageModel.CLUSTERING);
        defaultMQConsumer.setMessageListener(messageListener);
        defaultMQConsumer.start();
        log.info("rocketmqConsumer start success!");
    }


    public void destroy() {
        defaultMQConsumer.shutdown();
    }


    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public void setDefaultMQConsumer(DefaultMQPushConsumer defaultMQConsumer) {
        this.defaultMQConsumer = defaultMQConsumer;
    }

    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
}

com.hong.spring.dao.UserMapper

代码语言:javascript复制
package com.hong.spring.dao;

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}

com.hong.spring.listener.UserListener

代码语言:javascript复制
package com.hong.spring.listener;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
public class UserListener implements MessageListenerConcurrently {

    @Autowired
    private IUserService userService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

            try {
                for (MessageExt msg : msgs) {
                User user = RemotingSerializable.decode(msg.getBody(), User.class);
                    log.info("获取的用户信息{}", JSONObject.toJSONString(user));
                    DataResponse <Boolean> save = userService.save(user);
                    if(save==null || save.getData()==null || !save.getData()){
                        log.info("添加失败,原因{}",JSONObject.toJSONString(save));
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }catch (Exception e){
                log.error("添加用户异常{}",e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }
}

com/hong/spring/mapper/UserMapper.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>

com.hong.spring.provider.UserServiceImpl

代码语言:javascript复制
package com.hong.spring.provider;

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}

application.properties

代码语言:javascript复制
logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR

applicationContext.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:jdbc.properties,classpath:rocketmq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>

jdbc.properties

代码语言:javascript复制
config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456

log4j2.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>

logging.properties

代码语言:javascript复制
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

rocketmq.properties

代码语言:javascript复制
rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.user_tocke=user_topic

mybatis.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>

rocketmq.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

   <bean id="userConsumer" name="userConsumer" class="com.hong.spring.config.Consumer" init-method="init" destroy-method="destroy">
      <property name="producerGroup" value="${rocketmq.producerGroup}" />
      <property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
      <property name="topic" value="${rocketmq.user_tocke}"/>
      <property name="messageListener" ref="userListener"></property>
   </bean>
   <bean id="userListener" class="com.hong.spring.listener.UserListener" />
</beans>

WEB-INF/web.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_rocketmq_consumer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:rocketmq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_rocketmq_consumer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>

spring_mq/spring_rocketmq_consumer/pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring_rocketmq_consumer</artifactId>
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.8.0</version>
            <type>pom</type>
        </dependency>

    </dependencies>

</project>

结果

代码语言:javascript复制
16:33:49.387 [NettyClientPublicExecutor_3] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:49.845 [NettyClientPublicExecutor_3] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
16:33:49.897 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:49.913 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.932 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:49.937 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?) 
16:33:49.973 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:49.980 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:49.983 [NettyClientPublicExecutor_3] DEBUG com.alibaba.druid.pool.PreparedStatementPool - stmt enter cache
16:33:49.984 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.985 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:55.558 [ConsumeMessageThread_1] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:55.560 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?) 
16:33:55.561 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:55.562 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]

springboot整合rocketmq

springboot_rocketmq_api 入口 端口:8186 dubbo端口:20880

springboot_rocketmq_consumer 消费者 端口:8188 dubbo端口:20882

springboot_rocketmq_producer 生产者 端口:8187 dubbo端口:20881

框架技术:springboot2.x dubbo zk rocketmq4.8

关于zk参考另一篇文章:spring整合各种服务注册中心(zk、eureka、nacos、consul)

实现结果:通过 用户请求api,api通过dubbo rpc协议调用rpoducer生产者,若调用是查询则直接查库,如果调用是添加或修改数据,则通过rocketmq发送给生产者consumer,consumer监听到对应的topic后再调用producer的rpc接口最终实现消息异步化。

springboot_rocketmq_api

结构

代码语言:javascript复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ └─controller
        │ IndexController.java
        │ UserController.java
        │
        └─resources
                application.properties

com.hong.springboot.controller.IndexController

代码语言:javascript复制
package com.hong.springboot.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: csh
 * @Date: 2021/1/12 10:16
 * @Description:首页
 */
@RestController
public class IndexController {
    @RequestMapping("/")
    public String index(){
        return "成功!";
    }
}

com.hong.springboot.controller.UserController

代码语言:javascript复制
package com.hong.springboot.controller;


import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@Slf4j
public class UserController {
    @Reference
    private IUserService userService;

    @GetMapping("/findByAll")
    public DataResponse<List<User>> findByAll(){
        try {
            return userService.findByAll();
        } catch (Exception e){
            log.error("查询出错{}",e);
        }
        return DataResponse.BuildFailResponse("查询出错!");
    }

    @PostMapping("/save")
    public DataResponse<Boolean> save(User ao){
        if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        DataResponse <Boolean> save = userService.save(ao);
        return save;
    }
}

com.hong.springboot.Application

代码语言:javascript复制
package com.hong.springboot;


import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:springboot dubbo消费端
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

application.properties

代码语言:javascript复制
#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller

#避免端口冲突
server.port=8186

springboot_all/springboot_rocketmq_api/pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_rocketmq_api</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <!--静态资源导出问题-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

springboot_rocketmq_producer

代码语言:javascript复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ ├─config
        │ │ DruidConfig.java
        │ │ ExtRocketMQTemplate.java
        │ │ TopicAll.java
        │ │
        │ ├─dao
        │ │ UserMapper.java
        │ │
        │ └─provider
        │ UserServiceImpl.java
        │
        └─resources
                application.properties
                rocketmq.properties

com.hong.springboot.config.DruidConfig

代码语言:javascript复制
package com.hong.springboot.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * @author: csh
 * @Date: 2021/1/8 18:08
 * @Description:数据源配置
 */
@Configuration
public class DruidConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource(){
        return new DruidDataSource();
    }
}

com.hong.springboot.config.ExtRocketMQTemplate

代码语言:javascript复制
package com.hong.springboot.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

com.hong.springboot.config.TopicAll

代码语言:javascript复制
package com.hong.springboot.config;

/**
 * @author: csh
 * @Date: 2021/3/16 14:15
 * @Description:存放所有的topic
 */
public class TopicAll {
    //用户topic
    public static final String USER_TOPIC ="springboot_user_topic";
}

com.hong.springboot.dao.UserMapper

代码语言:javascript复制
package com.hong.springboot.dao;

import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {
    @Select("select id,user_name,age from user")
    List<User> findAllUser();

    @Insert("insert into user (user_name,age) values(#{userName},#{age})")
    int insert(User user);
}

com.hong.springboot.provider.UserServiceImpl

代码语言:javascript复制
package com.hong.springboot.provider;


import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.TopicAll;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUser();
        return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
    }

    @Override
    public DataResponse <Boolean> save(User userAO) {
        log.info("需要rocketmq添加的用户信息{}",JSONObject.toJSONString(userAO));
        SendResult sendResult = rocketMQTemplate.syncSend(TopicAll.USER_TOPIC, JSONObject.toJSONString(userAO));
        if(null==sendResult || sendResult.getSendStatus()!= SendStatus.SEND_OK){
            return DataResponse.BuildFailResponse("添加用户失败!");
        }
        return DataResponse.BuildFailResponse("添加用户成功!");
    }

    @Override
    public DataResponse <Boolean> reallySave(User user) {
        log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
        if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int insert = userDao.insert(user);
        return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
    }
}

com.hong.springboot.Application

代码语言:javascript复制
package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

application.properties

代码语言:javascript复制
rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_provider

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo

#避免端口冲突
server.port=8187
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456


#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity

rocketmq.properties

代码语言:javascript复制
# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic

hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA

hong.rocketmq.extNameServer=127.0.0.1:9876

springboot_all/springboot_rocketmq_producer/pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.hong.springboot</groupId>
        <artifactId>springboot_all</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.springboot</groupId>
    <artifactId>springboot_rocketmq_producer</artifactId>


    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>com.hong.springboot</groupId>
            <artifactId>springboot_mq_api</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>
        <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <!--<build>-->
        <!--<plugins>-->
            <!--<plugin>-->
                <!--<groupId>org.springframework.boot</groupId>-->
                <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
                <!--<configuration>-->
                    <!--<skip>true</skip>-->
                <!--</configuration>-->
            <!--</plugin>-->
        <!--</plugins>-->
    <!--</build>-->
    <!--静态资源导出问题-->
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

结果

springboot_rocketmq_consumer

代码语言:javascript复制
│ pom.xml
│
└─src
    └─main
        ├─java
        │ └─com
        │ └─hong
        │ └─springboot
        │ │ Application.java
        │ │
        │ ├─config
        │ │ ExtRocketMQTemplate.java
        │ │
        │ └─listener
        │ UserListener.java
        │ UserListenerList.java
        │ UserTransactionListener.java
        │
        └─resources
                application.properties
                rocketmq.properties

rocketmq.properties

代码语言:javascript复制
# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic

hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA

hong.rocketmq.extNameServer=127.0.0.1:9876

application.properties

代码语言:javascript复制
rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000

#避免端口冲突
server.port=8188

#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer

dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener
user_topic=springboot_user_topic
user_group_consumer=user_group_consumer

com.hong.springboot.Application

代码语言:javascript复制
package com.hong.springboot;

import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author: csh
 * @Date: 2020/11/21 11:37
 * @Description:
 */
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application  {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

}

com.hong.springboot.listener.UserListener

代码语言:javascript复制
package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<User> {

    @Reference
    private IUserService userService;

    @Override
    public void onMessage(User user) {

        log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
        DataResponse<Boolean> save = userService.reallySave(user);
        log.info("添加结果{}",JSONObject.toJSONString(save));
        if(save==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }

}

com.hong.springboot.config.ExtRocketMQTemplate

代码语言:javascript复制
package com.hong.springboot.config;

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

启动消费者 (开始消费)

以上就基本完成了rocketmq4.x dubbo zk springboot2.x 的整合让耦合业务解耦,在项目中rcp起到跨服务远程调用,而mq起到削峰使消息异步化。

rocketmq幂等性问题

为了防止消息重复消费导致业务处理异常,消息队列RocketMQ版的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。

通过数据发现,rocketmq有一个无法避免的问题,那就是消息重发,这是因为rocketmq在设计之初就这样设计,导致,这个问题无法避免,所以消息重发或重堆就会导致多次消息,可能会造成不一致幂等情况,那解决问题就是去重。

解决方案:

1.针对分布式集群场景可以通过redis缓存来存放多节点的消息id状态,使重复消费的时候判断是否消费过,若是则直接返回。

2.针对单机场景可以通过google guava的cache进行拦截判断,若消息一致则同redis缓存一样,默认过期时间3天,因为rocketmq存放时间为3天建议于配置一样。

由于本次采用的是第2 种采用的解决方案如下:

代码语言:javascript复制
package com.hong.springboot.listener;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<MessageExt> {

    @Reference
    private IUserService userService;

    //存放缓存三天 分布式集群场景可以
    private static Cache<String,ConsumeConcurrentlyStatus> cache = CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.DAYS).build();

    @Override
    public void onMessage(MessageExt msg) {
        if(null==msg || msg.getBody()==null || StringUtils.isEmpty(msg.getMsgId())){
            return;
        }
        ConsumeConcurrentlyStatus status = cache.getIfPresent(msg.getMsgId());
        //这两种消息才进行消费
        if(null==status || status.equals(ConsumeConcurrentlyStatus.RECONSUME_LATER)){
            String msgId = msg.getMsgId();
            log.info("msgId" msgId);
            User user = RemotingSerializable.decode(msg.getBody(), User.class);
            log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
            DataResponse<Boolean> save = userService.reallySave(user);
            log.info("添加结果{}",JSONObject.toJSONString(save));
            if(save==null || !save.getData()){
                log.info("添加失败,原因{}",JSONObject.toJSONString(save));
                cache.put(msgId, ConsumeConcurrentlyStatus.RECONSUME_LATER);
            }
            cache.put(msgId, ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
        }else{
            log.info("该消息{}已消费过{}",JSONObject.toJSONString(msg),JSONObject.toJSONString(status));
        }

    }

}

然后重新验证如下:新生产一条消息,若添加成功默认进入cache中有效期为3天,然后在管理后台重推送该消息。RESEND MESSAGE

相关学习资料

《RocketMQ技术内幕》

推荐学习资料 官网:https://rocketmq.apache.org/

最后

通过rocketmq dubbo再搭配集群方式,在实际企业中用得是相当多的,而且这种架构模式在很多生产中得到特别多的验证,可以说屡试不爽。可以说在高性能、高可用、高并发,在做后端这块是一块不错的架构选择,并且dubbo和rocketmq都可以动态水平增缩,性能上就更不用说了,都是10万起步的。普通百万级的应用,一个3节点标配的集群基本搞定了。

参考文章:

https://www.cnblogs.com/lifeibai/p/9167701.html

https://www.cnblogs.com/weifeng1463/p/12889300.html

https://cloud.tencent.com/developer/article/1630183

https://www.cnblogs.com/chx9832/p/12325871.html

0 人点赞