文代码:https://gitee.com/hong99/spring/issues/I1N1DF
注:由于本文比较长,建议还是下载源码学习。
中间件是什么?
中间件是介于应用系统和系统软件之间的一类软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。---百度百科
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。
参考地址:https://baike.baidu.com/item/中间件/452240?fr=aladdin
中间件解决了什么问题?
使高度耦合的系统解耦;
瞬时高峰的削峰处理;
保证数据安全性和最终一致;
原来系统间的交互是通过JMS或http协议调用,在效率、安全、可靠上面都不是很高,而中间件可以让原有的复杂系统解耦,分为消息端、生产方,队列,并且还可以让数据保存到中间件持久下来,也解决了数据丢失问题,数据也可以异步化,在一些平台大触或高峰时期的时候可以起到削峰作用,避免同步产生的一系列问题。
中间件有哪些?
RocketMQ
Rocket Mqj是阿里巴巴开源,是一个分布式消息和流数据平台,具有低 延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件,2016年11月21日,阿里巴巴向Apache软件基金会捐赠了RocketMQ;第二年2月20日,Apache软件基金会宣布Apache RocketMQ成为顶级项目。
维基百科:https://zh.wikipedia.org/wiki/Apache_RocketMQ
官网:https://rocketmq.apache.org/
kafka
Kafka最初由领英开发开源,由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,[3]这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。---维基百科
官网:http://kafka.apache.org/
RabbitMQ
Rabbit科技有限公司开发了RabbitMQ,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
官网:https://www.rabbitmq.com/
ActiveMQ
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
官网:http://activemq.apache.org/index.html
ZeroMQ
ZeroMQ(也称为ØMQ,0MQ或ZMQ)是一种高性能的异步消息传递库,旨在用于分布式或并发应用程序中。它提供了一个消息队列,但是与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。
官网:https://zeromq.org/
TubeMQ
apache TubeMQ是腾讯开源万亿级分布式消息中间件,专注于海量数据下的数据传输和存储,与许多开源MQ项目相比,TubeMQ在稳定性、性能和低成本方面具有独特的优势。
官网:https://tubemq.apache.org/zh-cn/
NSQ
NSQ 是无中心设计、节点自动注册和发现的开源消息系统。可作为内部通讯框架的基础,易于配置和发布。
官网:https://nsq.io/
....
消息中间件少说也有十几个多的话直的也太多了...不建议每个都深入了解,找一两个合适的就OK...
最后
代码实现
本文代码:https://gitee.com/hong99/spring/issues/I1N1DF
RocketMQ
基本概念
1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
2 消息生产者(Producer) 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
6 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过 名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
7 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
8 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
11 集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
13 普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
14 严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
15 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
16 标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
特性(features)
订阅与发布:消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
消息顺序:消息有序指的是一类消息消费时,能按照发送的顺序来消费。
消息过滤:RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
消息可靠性:RocketMQ支持消息的高可靠。
至少一次:至少一次(At least Once)指每个消息必须投递一次。
回溯消费:回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。
事务消息:RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
定时消息:定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
消息重试:Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
消息重投:生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。
流量控制:生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
死信队列:死信队列用于处理无法被正常消费的消息。
架构设计
NameServer 是注册中心,类似于zk,这个是rocketmq自研的注册中心;
Broker是节点服务器,类似于主机
producer是生产者;
Consumer是消费者;
流程:Broker每次启动的时候都会向NameServer注册,而NameServer每隔30秒会自动检测Broker是否存活若Broker死亡则从NameServer中移除,
详细可以参考:https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md
基于windows 10安装rocketmq
环境要求:
64位 windows10;
64位JDK 1.8 ;
Maven 3.2.x;
Git;
足够的4G磁盘空间
下载reocketmq:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
配置环境变量
代码语言:javascript复制ROCKETMQ_HOME="你的目标"
NAMESRV_ADDR="localhost:9876"
运行
遇到问题
代码语言:javascript复制Please set the ROCKETMQ_HOME variable in your environment!
解决方法
powershell配置环境变量
代码语言:javascript复制$Env:ROCKETMQ_HOME="你的地址"
$Env:NAMESRV_ADDR="localhost:9876"
如下:
发现The Name Server boot success. 就证明配置是OK的。
启动broker
代码语言:javascript复制start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
注意:启动 mqbroker.cmd会报找不到主类,需要修改 mqbroker下的%CLASSPATH%添加双引号~
部署管理后台
下载代码:https://github.com/apache/rocketmq-externals
解压后进入:..rocketmq-externals-masterrocketmq-consolesrcmainresources 打开application.properties
rocketmq-externals-masterrocketmq-console 修改pom.xml 版本
进入rocketmq-externals-masterrocketmq-console根目录进行打包:
代码语言:javascript复制mvn clean package -Dmaven.test.skip=true
进入生成的target目录进得运下命令如下:
代码语言:javascript复制java -jar rocketmq-console-ng-2.0.0.jar
然后访问管理台 127.0.0.1:端口 ,这里我改的是8081所以就是127.0.0.1:8081
java实现rocketmq
搭好单节点的rocketmq和管理平台,接下来通过maven纯java进行学习测试。
相关参数:https://github.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md
官方案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
目录结构
代码语言:javascript复制│ pom.xml
│
└─src
├─main
│ ├─java
│ │ └─com
│ │ └─hong
│ │ └─rocketmq
│ │ ├─consumer
│ │ │ Consumer.java
│ │ │ ScheduledMessageConsumer.java
│ │ │
│ │ └─producer
│ │ AsyncProducer.java
│ │ BroadcastProducer.java
│ │ OnewayProducer.java
│ │ ScheduledMessageProducer.java
│ │ SyncProducer.java
│ │
│ └─resources
└─test
└─java
spring_mq/java_rocketmq/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>java_rocketmq</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
</project>
com.hong.rocketmq.producer.SyncProducer
代码语言:javascript复制package com.hong.rocketmq.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @author: csh
* @Date: 2021/3/12 09:50
* @Description:同步发送消息
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i ) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ 我是同步发送 " i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
com.hong.rocketmq.consumer.Consumer
代码语言:javascript复制package com.hong.rocketmq.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
*
* 功能描述: 消费端
*
* @param:
* @return:
* @auther: csh
* @date: 2021/3/12 11:02
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可以把消息打开放到那里,然后再运行生产端,结果如下:
生产发送数据
接收端(consumer)
到你的本地ip:8081 中Message 查看如下:
写了几个java的这里不一一贴出来,建议下载代码学习。
遇到过的问题
代码语言:javascript复制org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 1.00 CQ: 1.00 INDEX: 1.00], messages are put to the slave, message store has been shut down, etc. BROKER: 10.3.6.59:10911
解决
将 c盘用户下面的store下面全部干掉!
代码语言:javascript复制C:Users你的用户store
spring整合rocketmq
生产者 通过controller添加用户,然后发送给消费者进行添加。
生产者
项目结构
代码语言:javascript复制│ pom.xml
│ spring_rocketmq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Producer.java
│ │ │ │ TopicAll.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
└─WEB-INF
web.xml
com.hong.spring.config.Producer
代码语言:javascript复制package com.hong.spring.config;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/**
* @author: csh
* @Date: 2021/3/16 09:43
* @Description:rocketmq生产者
*/
@Log4j2
public class Producer{
//生产实例
private DefaultMQProducer defaultMQProducer;
//生产组
private String producerGroup;
//地址
private String namesrvAddr;
/**
* 初始化
*/
public void init() throws MQClientException {
// 参数信息
log.info("DefaultMQProducer 初始化");
log.info(producerGroup);
log.info(namesrvAddr);
// 初始化
defaultMQProducer = new DefaultMQProducer(producerGroup);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
defaultMQProducer.start();
log.info("rocketmq start success!");
}
public void destroy() {
defaultMQProducer.shutdown();
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
}
com.hong.spring.config.TopicAll
代码语言:javascript复制package com.hong.spring.config;
/**
* @author: csh
* @Date: 2021/3/16 14:15
* @Description:存放所有的topic
*/
public class TopicAll {
//用户topic
public static final String HONG_TOPIC="user_topic";
}
com.hong.spring.controller.ao.UserSaveAO
代码语言:javascript复制package com.hong.spring.config;
/**
* @author: csh
* @Date: 2021/3/16 14:15
* @Description:存放所有的topic
*/
public class TopicAll {
//用户topic
public static final String HONG_TOPIC="user_topic";
}
com.hong.spring.controller.UserController
代码语言:javascript复制package com.hong.spring.controller;
import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.Producer;
import com.hong.spring.config.TopicAll;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:
*/
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {
@Autowired
private Producer hongProducer;
@RequestMapping("save")
public DataResponse<Boolean> save(UserSaveAO ao){
log.info("添加用户入参{}",JSONObject.toJSONString(ao));
if(null==ao){
return DataResponse.BuildFailResponse("参数不能为空!");
}
try {
User user =new User();
BeanUtils.copyProperties(ao,user);
//在正式的时候 可以将通tags再正式区分不同的业务 比如:更新 、新增 等
Message message = new Message(TopicAll.HONG_TOPIC,"saveTags", RemotingSerializable.encode(user));
//发送同步消息
log.info("最终发出去的消息{}", JSONObject.toJSONString(message));
SendResult send = hongProducer.getDefaultMQProducer().send(message);
if(null==send || send.getSendStatus()!= SendStatus.SEND_OK){
return DataResponse.BuildFailResponse("添加用户失败!");
}
return DataResponse.BuildFailResponse("添加用户成功!");
}catch (Exception e){
log.error("添加出错{}",e);
return DataResponse.BuildFailResponse("添加出错请重试!");
}
}
}
application.properties
代码语言:javascript复制logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
applicationContext.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<context:property-placeholder location="classpath:rocketmq.properties"/>
<!-- 开启注解 -->
<context:annotation-config />
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
</beans>
log4j2.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
logging.properties
代码语言:javascript复制org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
rocketmq.properties
代码语言:javascript复制rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<bean id="hongProducer" name="hongProducer" class="com.hong.spring.config.Producer" init-method="init" destroy-method="destroy">
<property name="producerGroup" value="${rocketmq.producerGroup}" />
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
</bean>
</beans>
spring_mq/spring_rocketmq_producer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring_rocketmq_producer</artifactId>
<dependencies>
<dependency>
<artifactId>spring_mq_common_api</artifactId>
<version>1.0-SNAPSHOT</version>
<groupId>com.hong</groupId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.0</version>
<type>pom</type>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.logging.log4j</groupId>-->
<!--<artifactId>log4j-core</artifactId>-->
<!--<version>2.9.0</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.logging.log4j</groupId>-->
<!--<artifactId>log4j-api</artifactId>-->
<!--<version>2.5</version>-->
<!--</dependency>-->
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
web.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>spring_rocketmq_producer</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml,
classpath:rocketmq.xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet-mapping>
<servlet-name>spring_rocketmq_producer</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
代码语言:javascript复制[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Artifact is deployed successfully
[2021-03-16 03:21:09,364] Artifact spring_rocketmq_producer:war exploded: Deploy took 10,004 milliseconds
15:22:17.505 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1000,"username":"hong"}
15:22:17.518 [http-nio-8080-exec-5] INFO com.hong.spring.controller.UserController - 最终发出去的消息{"body":"eyJhZ2UiOjEwMDAsInVzZXJuYW1lIjoiaG9uZyJ9","delayTimeLevel":0,"flag":0,"properties":{"WAIT":"true","TAGS":"saveTags"},"tags":"saveTags","topic":"user_topic","waitStoreMsgOK":true}
消息状态
消息者
代码语言:javascript复制│ pom.xml
│ spring_rocketmq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ Consumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ rocketmq.properties
│ │ rocketmq.xml
│ │
│ └─test
│ └─java
└─web
└─WEB-INF
web.xml
com.hong.spring.config.Consumer
代码语言:javascript复制package com.hong.spring.config;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @author: csh
* @Date: 2021/3/16 09:43
* @Description:rocketmq生产者
*/
@Log4j2
public class Consumer {
//生产实例
private DefaultMQPushConsumer defaultMQConsumer;
//生产组
private String producerGroup;
//地址
private String namesrvAddr;
//topic
private String topic;
//tag
private String tag="*";
//监听
private MessageListener messageListener;
/**
* 初始化
*/
public void init() throws MQClientException {
// 参数信息
log.info("DefaultMQProducer 初始化");
log.info(producerGroup);
log.info(namesrvAddr);
// 初始化
defaultMQConsumer = new DefaultMQPushConsumer(producerGroup);
defaultMQConsumer.setNamesrvAddr(namesrvAddr);
defaultMQConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));
defaultMQConsumer.subscribe(topic,tag);
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
// 如果非第一次启动,那么按照上次消费的位置继续消费
defaultMQConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置为集群消费(区别于广播消费) CLUSTERING:集群消费 BROADCASTING:广播消费
defaultMQConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQConsumer.setMessageListener(messageListener);
defaultMQConsumer.start();
log.info("rocketmqConsumer start success!");
}
public void destroy() {
defaultMQConsumer.shutdown();
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public void setDefaultMQConsumer(DefaultMQPushConsumer defaultMQConsumer) {
this.defaultMQConsumer = defaultMQConsumer;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
}
com.hong.spring.dao.UserMapper
代码语言:javascript复制package com.hong.spring.dao;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
/**
*
* 功能描述:查询总条数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:31
*/
List<User> findAllUserList();
/**
*
* 功能描述:获取总数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int findAllTotal();
/**
*
* 功能描述:更新
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int update(User user);
/**
*
* 功能描述:添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
int save(User user);
/**
*
* 功能描述:批量添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 15:46
*/
int insertBatch(@Param("list") List <User> list);
/**
*
* 功能描述:通过id查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
User findById(int id);
/**
*
* 功能描述:通过分页查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 16:05
*/
List<User> findByPage(UserAO ao);
}
com.hong.spring.listener.UserListener
代码语言:javascript复制package com.hong.spring.listener;
import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* @author: csh
* @Date: 2021/3/16 11:14
* @Description:用户监听
*/
@Log4j2
public class UserListener implements MessageListenerConcurrently {
@Autowired
private IUserService userService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
User user = RemotingSerializable.decode(msg.getBody(), User.class);
log.info("获取的用户信息{}", JSONObject.toJSONString(user));
DataResponse <Boolean> save = userService.save(user);
if(save==null || save.getData()==null || !save.getData()){
log.info("添加失败,原因{}",JSONObject.toJSONString(save));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
log.error("添加用户异常{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
com/hong/spring/mapper/UserMapper.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
<resultMap type="com.hong.spring.entity.User" id="user">
<id column="id" property="id" />
<result column="user_name" property="username" />
<result column="age" property="age" />
</resultMap>
<select id="findById" resultType="com.hong.spring.entity.User">
SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
</select>
<select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
select * from user where 1=1 limit #{page},#{pageSize}
</select>
<select id="findAllUserList" resultMap="user">
SELECT * FROM user
</select>
<select id="findAllTotal" resultType="int">
SELECT count(*) FROM user
</select>
<insert id="save" >
INSERT INTO user ( user_name, age)
VALUES (#{username,jdbcType=VARCHAR},
#{age,jdbcType=INTEGER})
</insert>
<insert id="insertBatch">
insert into user
( user_name, age)
values
<foreach collection="list" item="user" index="index"
separator=",">
(#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
</foreach>
</insert>
<update id="update" >
update user
<set>
<if test="username !=null">
user_name=#{username,jdbcType=VARCHAR},
</if>
<if test="age !=null">
age =#{age,jdbcType=INTEGER}
</if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>
com.hong.spring.provider.UserServiceImpl
代码语言:javascript复制package com.hong.spring.provider;
import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service("userService")
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUserList();
int allTotal = userDao.findAllTotal();
return DataResponse.BuildSuccessResponse(allUserList,allTotal);
}
@Override
@Transactional
public DataResponse <Boolean> save(User user) {
if(null==user){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int save = userDao.save(user);
return DataResponse.BuildSuccessResponse(save>0?true:false);
}
@Override
public DataResponse <Boolean> insertBatch(List <User> list) {
if(null==list){
return DataResponse.BuildFailResponse("参数不能为空!");
}
int batchSave = userDao.insertBatch(list);
return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
}
@Override
@Transactional
public DataResponse <Boolean> update(User user) {
if(null==user || user.getId()==null){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int update = userDao.update(user);
return DataResponse.BuildSuccessResponse(update>0?true:false);
}
@Override
public DataResponse <User> findById(int i) {
User byId = userDao.findById(i);
return DataResponse.BuildSuccessResponse(byId);
}
@Override
public DataResponse <List <User>> findByPage(UserAO ao) {
if(ao==null){
ao.setPage(0);
ao.setPageSize(10);
}else{
ao.setPage(ao.getPageSize() * ao.getPage());
}
int allTotal = userDao.findAllTotal();
List <User> byPage = userDao.findByPage(ao);
return DataResponse.BuildSuccessResponse(byPage,allTotal);
}
}
application.properties
代码语言:javascript复制logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
applicationContext.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<context:property-placeholder location="classpath:jdbc.properties,classpath:rocketmq.properties"/>
<!-- 开启注解 -->
<context:annotation-config />
<!--开启注解事务-->
<tx:annotation-driven transaction-manager="transactionManager" />
<!--放行静态资源-->
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<!-- 基础配置 -->
<property name="url" value="${jdbc.url}"></property>
<property name="driverClassName" value="${jdbc.driver}"></property>
<property name="username" value="${jdbc.user}"></property>
<property name="password" value="${jdbc.password}"></property>
<!-- 关键配置 -->
<!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
<property name="initialSize" value="3" />
<!-- 最小连接池数量 -->
<property name="minIdle" value="2" />
<!-- 最大连接池数量 -->
<property name="maxActive" value="15" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="10000" />
<!-- 性能配置 -->
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
<!-- 其他配置 -->
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
执行validationQuery检测连接是否有效。-->
<property name="testWhileIdle" value="true" />
<!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
<property name="testOnBorrow" value="true" />
<!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
<property name="testOnReturn" value="false" />
</bean>
<!--事务管理器-->
<!-- sqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!-- 加载 MyBatis 的配置文件 -->
<property name="configLocation" value="classpath:mybatis.xml"/>
<!-- 数据源 -->
<property name="dataSource" ref="dataSource"/>
<!-- 所有配置的mapper文件 -->
<property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
</bean>
<!-- Mapper 扫描器 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!-- 扫描 包下的组件 -->
<property name="basePackage" value="com.hong.spring.dao" />
<!-- 关联mapper扫描器 与 sqlsession管理器 -->
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
<!--事务配置-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
jdbc.properties
代码语言:javascript复制config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
log4j2.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
logging.properties
代码语言:javascript复制org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
rocketmq.properties
代码语言:javascript复制rocketmq.producerGroup=hong_group
rocketmq.namesrvAddr=127.0.0.1:9876
rocketmq.user_tocke=user_topic
mybatis.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- settings -->
<settings>
<!-- 打开延迟加载的开关 -->
<setting name="lazyLoadingEnabled" value="true"/>
<!-- 将积极加载改为消极加载(即按需加载) -->
<setting name="aggressiveLazyLoading" value="false"/>
<!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
<setting name="cacheEnabled" value="true"/>
<!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
<setting name="mapUnderscoreToCamelCase" value="true"/>
<!-- 使用列别名代替列名 默认:true seslect name as title from table -->
<setting name="useColumnLabel" value="true"/>
<!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
<setting name="useGeneratedKeys" value="true"/>
</settings>
<!-- 别名定义 -->
<typeAliases>
<package name="com.hong.spring.entity"/>
</typeAliases>
</configuration>
rocketmq.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<bean id="userConsumer" name="userConsumer" class="com.hong.spring.config.Consumer" init-method="init" destroy-method="destroy">
<property name="producerGroup" value="${rocketmq.producerGroup}" />
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}" />
<property name="topic" value="${rocketmq.user_tocke}"/>
<property name="messageListener" ref="userListener"></property>
</bean>
<bean id="userListener" class="com.hong.spring.listener.UserListener" />
</beans>
WEB-INF/web.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>spring_rocketmq_consumer</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml,
classpath:rocketmq.xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet-mapping>
<servlet-name>spring_rocketmq_consumer</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
spring_mq/spring_rocketmq_consumer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring_rocketmq_consumer</artifactId>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<artifactId>spring_mq_common_api</artifactId>
<version>1.0-SNAPSHOT</version>
<groupId>com.hong</groupId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.0</version>
<type>pom</type>
</dependency>
</dependencies>
</project>
结果
代码语言:javascript复制16:33:49.387 [NettyClientPublicExecutor_3] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:49.845 [NettyClientPublicExecutor_3] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
16:33:49.897 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:49.913 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.932 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:49.937 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?)
16:33:49.973 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:49.980 [NettyClientPublicExecutor_3] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:49.983 [NettyClientPublicExecutor_3] DEBUG com.alibaba.druid.pool.PreparedStatementPool - stmt enter cache
16:33:49.984 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.985 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:49.986 [NettyClientPublicExecutor_3] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7ddd64f2]
16:33:55.558 [ConsumeMessageThread_1] INFO com.hong.spring.listener.UserListener - 获取的用户信息{"age":1000,"username":"hong"}
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.560 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.transaction.SpringManagedTransaction - JDBC Connection [com.mysql.jdbc.JDBC4Connection@6b13936b] will be managed by Spring
16:33:55.560 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Preparing: INSERT INTO user ( user_name, age) VALUES (?, ?)
16:33:55.561 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - ==> Parameters: hong(String), 1000(Integer)
16:33:55.562 [ConsumeMessageThread_1] DEBUG com.hong.spring.dao.UserMapper.save - <== Updates: 1
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
16:33:55.563 [ConsumeMessageThread_1] DEBUG org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@96ca4ad]
springboot整合rocketmq
springboot_rocketmq_api 入口 端口:8186 dubbo端口:20880
springboot_rocketmq_consumer 消费者 端口:8188 dubbo端口:20882
springboot_rocketmq_producer 生产者 端口:8187 dubbo端口:20881
框架技术:springboot2.x dubbo zk rocketmq4.8
关于zk参考另一篇文章:spring整合各种服务注册中心(zk、eureka、nacos、consul)
实现结果:通过 用户请求api,api通过dubbo rpc协议调用rpoducer生产者,若调用是查询则直接查库,如果调用是添加或修改数据,则通过rocketmq发送给生产者consumer,consumer监听到对应的topic后再调用producer的rpc接口最终实现消息异步化。
springboot_rocketmq_api
结构
代码语言:javascript复制│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ └─controller
│ IndexController.java
│ UserController.java
│
└─resources
application.properties
com.hong.springboot.controller.IndexController
代码语言:javascript复制package com.hong.springboot.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: csh
* @Date: 2021/1/12 10:16
* @Description:首页
*/
@RestController
public class IndexController {
@RequestMapping("/")
public String index(){
return "成功!";
}
}
com.hong.springboot.controller.UserController
代码语言:javascript复制package com.hong.springboot.controller;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:
*/
@RestController
@Slf4j
public class UserController {
@Reference
private IUserService userService;
@GetMapping("/findByAll")
public DataResponse<List<User>> findByAll(){
try {
return userService.findByAll();
} catch (Exception e){
log.error("查询出错{}",e);
}
return DataResponse.BuildFailResponse("查询出错!");
}
@PostMapping("/save")
public DataResponse<Boolean> save(User ao){
if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
return DataResponse.BuildFailResponse("参数不能为空!");
}
DataResponse <Boolean> save = userService.save(ao);
return save;
}
}
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:springboot dubbo消费端
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
application.properties
代码语言:javascript复制#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller
#避免端口冲突
server.port=8186
springboot_all/springboot_rocketmq_api/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_rocketmq_api</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_mq_api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.4-beta</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
springboot_rocketmq_producer
代码语言:javascript复制│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ ├─config
│ │ DruidConfig.java
│ │ ExtRocketMQTemplate.java
│ │ TopicAll.java
│ │
│ ├─dao
│ │ UserMapper.java
│ │
│ └─provider
│ UserServiceImpl.java
│
└─resources
application.properties
rocketmq.properties
com.hong.springboot.config.DruidConfig
代码语言:javascript复制package com.hong.springboot.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* @author: csh
* @Date: 2021/1/8 18:08
* @Description:数据源配置
*/
@Configuration
public class DruidConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource(){
return new DruidDataSource();
}
}
com.hong.springboot.config.ExtRocketMQTemplate
代码语言:javascript复制package com.hong.springboot.config;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
com.hong.springboot.config.TopicAll
代码语言:javascript复制package com.hong.springboot.config;
/**
* @author: csh
* @Date: 2021/3/16 14:15
* @Description:存放所有的topic
*/
public class TopicAll {
//用户topic
public static final String USER_TOPIC ="springboot_user_topic";
}
com.hong.springboot.dao.UserMapper
代码语言:javascript复制package com.hong.springboot.dao;
import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
@Select("select id,user_name,age from user")
List<User> findAllUser();
@Insert("insert into user (user_name,age) values(#{userName},#{age})")
int insert(User user);
}
com.hong.springboot.provider.UserServiceImpl
代码语言:javascript复制package com.hong.springboot.provider;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.TopicAll;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUser();
return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
}
@Override
public DataResponse <Boolean> save(User userAO) {
log.info("需要rocketmq添加的用户信息{}",JSONObject.toJSONString(userAO));
SendResult sendResult = rocketMQTemplate.syncSend(TopicAll.USER_TOPIC, JSONObject.toJSONString(userAO));
if(null==sendResult || sendResult.getSendStatus()!= SendStatus.SEND_OK){
return DataResponse.BuildFailResponse("添加用户失败!");
}
return DataResponse.BuildFailResponse("添加用户成功!");
}
@Override
public DataResponse <Boolean> reallySave(User user) {
log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
return DataResponse.BuildFailResponse("参数不能为空!");
}
int insert = userDao.insert(user);
return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
}
}
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableDubbo
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
application.properties
代码语言:javascript复制rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000
#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_provider
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo
#避免端口冲突
server.port=8187
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity
rocketmq.properties
代码语言:javascript复制# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic
hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA
hong.rocketmq.extNameServer=127.0.0.1:9876
springboot_all/springboot_rocketmq_producer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_rocketmq_producer</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_mq_api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.4-beta</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--<build>-->
<!--<plugins>-->
<!--<plugin>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--<configuration>-->
<!--<skip>true</skip>-->
<!--</configuration>-->
<!--</plugin>-->
<!--</plugins>-->
<!--</build>-->
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
结果
springboot_rocketmq_consumer
代码语言:javascript复制│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ ├─config
│ │ ExtRocketMQTemplate.java
│ │
│ └─listener
│ UserListener.java
│ UserListenerList.java
│ UserTransactionListener.java
│
└─resources
application.properties
rocketmq.properties
rocketmq.properties
代码语言:javascript复制# properties used in the application
rocketmq.topic=string-topic
hong.rocketmq.orderTopic=order-paid-topic
hong.rocketmq.msgExtTopic=message-ext-topic
hong.rocketmq.transTopic=spring-transaction-topic
hong.rocketmq.topic.user=user-topic
hong.rocketmq.bytesRequestTopic=bytesRequestTopic:tagA
hong.rocketmq.stringRequestTopic=stringRequestTopic:tagA
hong.rocketmq.objectRequestTopic=objectRequestTopic:tagA
hong.rocketmq.genericRequestTopic=genericRequestTopic:tagA
hong.rocketmq.extNameServer=127.0.0.1:9876
application.properties
代码语言:javascript复制rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000
#避免端口冲突
server.port=8188
#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener
user_topic=springboot_user_topic
user_group_consumer=user_group_consumer
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
com.hong.springboot.listener.UserListener
代码语言:javascript复制package com.hong.springboot.listener;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author: csh
* @Date: 2021/3/16 11:14
* @Description:用户监听
*/
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<User> {
@Reference
private IUserService userService;
@Override
public void onMessage(User user) {
log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
DataResponse<Boolean> save = userService.reallySave(user);
log.info("添加结果{}",JSONObject.toJSONString(save));
if(save==null || !save.getData()){
log.info("添加失败,原因{}",JSONObject.toJSONString(save));
}
}
}
com.hong.springboot.config.ExtRocketMQTemplate
代码语言:javascript复制package com.hong.springboot.config;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@ExtRocketMQTemplateConfiguration(nameServer = "${hong.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
启动消费者 (开始消费)
以上就基本完成了rocketmq4.x dubbo zk springboot2.x 的整合让耦合业务解耦,在项目中rcp起到跨服务远程调用,而mq起到削峰使消息异步化。
rocketmq幂等性问题
为了防止消息重复消费导致业务处理异常,消息队列RocketMQ版的消费者在接收到消息后,有必要根据业务上的唯一Key对消息做幂等处理。
通过数据发现,rocketmq有一个无法避免的问题,那就是消息重发,这是因为rocketmq在设计之初就这样设计,导致,这个问题无法避免,所以消息重发或重堆就会导致多次消息,可能会造成不一致幂等情况,那解决问题就是去重。
解决方案:
1.针对分布式集群场景可以通过redis缓存来存放多节点的消息id状态,使重复消费的时候判断是否消费过,若是则直接返回。
2.针对单机场景可以通过google guava的cache进行拦截判断,若消息一致则同redis缓存一样,默认过期时间3天,因为rocketmq存放时间为3天建议于配置一样。
由于本次采用的是第2 种采用的解决方案如下:
代码语言:javascript复制package com.hong.springboot.listener;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* @author: csh
* @Date: 2021/3/16 11:14
* @Description:用户监听
*/
@Service
@RocketMQMessageListener(consumerGroup ="${user_group_consumer}" , topic = "${user_topic}")
@Log4j2
public class UserListener implements RocketMQListener<MessageExt> {
@Reference
private IUserService userService;
//存放缓存三天 分布式集群场景可以
private static Cache<String,ConsumeConcurrentlyStatus> cache = CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.DAYS).build();
@Override
public void onMessage(MessageExt msg) {
if(null==msg || msg.getBody()==null || StringUtils.isEmpty(msg.getMsgId())){
return;
}
ConsumeConcurrentlyStatus status = cache.getIfPresent(msg.getMsgId());
//这两种消息才进行消费
if(null==status || status.equals(ConsumeConcurrentlyStatus.RECONSUME_LATER)){
String msgId = msg.getMsgId();
log.info("msgId" msgId);
User user = RemotingSerializable.decode(msg.getBody(), User.class);
log.info("springboot获取的用户信息{}", JSONObject.toJSONString(user));
DataResponse<Boolean> save = userService.reallySave(user);
log.info("添加结果{}",JSONObject.toJSONString(save));
if(save==null || !save.getData()){
log.info("添加失败,原因{}",JSONObject.toJSONString(save));
cache.put(msgId, ConsumeConcurrentlyStatus.RECONSUME_LATER);
}
cache.put(msgId, ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
}else{
log.info("该消息{}已消费过{}",JSONObject.toJSONString(msg),JSONObject.toJSONString(status));
}
}
}
然后重新验证如下:新生产一条消息,若添加成功默认进入cache中有效期为3天,然后在管理后台重推送该消息。RESEND MESSAGE
相关学习资料
《RocketMQ技术内幕》
推荐学习资料 官网:https://rocketmq.apache.org/
最后
通过rocketmq dubbo再搭配集群方式,在实际企业中用得是相当多的,而且这种架构模式在很多生产中得到特别多的验证,可以说屡试不爽。可以说在高性能、高可用、高并发,在做后端这块是一块不错的架构选择,并且dubbo和rocketmq都可以动态水平增缩,性能上就更不用说了,都是10万起步的。普通百万级的应用,一个3节点标配的集群基本搞定了。
参考文章:
https://www.cnblogs.com/lifeibai/p/9167701.html
https://www.cnblogs.com/weifeng1463/p/12889300.html
https://cloud.tencent.com/developer/article/1630183
https://www.cnblogs.com/chx9832/p/12325871.html