笔者也是机缘巧合,才会开启自己的写书之路。
在写这本书之前,我先后在两家杭州的“独角兽”公司担任技术负责人,并推进公司核心业务的“中台化”改造。在落地业务中台和技术中台的过程中,督促并指导开发人员统一使用Spring Cloud Alibaba作为中台服务最底层的基础框架。为了快速推进业务服务Spring Cloud Alibaba化的进度,我冲在业务的第一线,收集和整理开发人员在使用Spring Cloud Alibaba过程中反馈的技术问题,并提供有效的技术解决方案,直至项目落地。
我每周都会做技术复盘,通过分析大量的问题总结出一个结论:开发人员反馈的问题大部分都是由于Spring Cloud Alibaba使用不合理所造成的。也就是说,很多开发人员并不了解Spring Cloud Alibaba的原理及如何落地实践。于是,我就产生了把我这几年落地Spring Cloud Alibaba的经验通过图书的方式输出的想法。
回到主题,我们来聊一聊Spring Cloud Alibaba微服务架构实战派上下册书籍。
本书上册核心内容
1.1 Spring Cloud Alibaba基础实战
1.1.1 主要内容
(1)Spring Cloud Alibaba“牛刀小试”,包括:使用Spring Cloud Alibaba作为基础框架实现乐观锁、实现多数据源和实现SQL语句中表名的动态替换;
(2)【实例】用Maven和Spring Cloud Alibaba实现多环境部署,学习完本章内容,读者可以快速的使用配套源码,搭建可扩展的多环境运维部署环境;
(3)【实例】用“MyBatis-Plus Spring Cloud Alibaba”实现多租户架构,学习完本章内容,读者可以快速的使用配套源码,实现微服务架构中的多租户架构。
1.1.2 MyBatis-Plus实现多租户架构的核心原理
熟悉Mybatis原理的开发应该都知道它的拦截器机制,Mybatis会使用注解@Intercepts去标注一个拦截器,并在Mybatis框架启动的过程中,扫描当前Spring IOC容器中被注解@Intercepts标记的拦截器。
第一步:MyBatis-Plus定义一个全局拦截器MybatisPlusInterceptor类,如下所示。
代码语言:javascript复制//通过注解@Intercepts,将MyBatis-Plus和Mybatis绑定在一起
@Intercepts(
{
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class}),
@Signature(type = StatementHandler.class, method = "getBoundSql", args = {}),
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
}
)
public class MybatisPlusInterceptor implements Interceptor {
@Setter
private List<InnerInterceptor> interceptors = new ArrayList<>();
@Override
public Object intercept(Invocation invocation) throws Throwable {
//遍历内部拦截器列表,并执行InnerInterceptor.beforeUpdate()
}
}
第二步:MyBatis-Plus定义一个内部多租户拦截TenantLineInnerInterceptor类,如下所示。
代码语言:javascript复制public class TenantLineInnerInterceptor extends JsqlParserSupport implements InnerInterceptor {
private TenantLineHandler tenantLineHandler;
//使用代理和反射,生成一个租户处理器TenantLineHandler
@Override
public void setProperties(Properties properties) {
PropertyMapper.newInstance(properties)
.whenNotBlack("tenantLineHandler", ClassUtils::newInstance, this::setTenantLineHandler);
}
...
}
1.2 分布式服务治理——基于Nacos
1.2.1 主要内容
(1)认识分布式服务治理;
(2)了解主流的注册中心;
(3)将应用接入Nacos 注册中心;
(4)用“NacosNamingService类 @EnableDiscoveryClient”实现服务的注册/订阅;
(5)用“Ribbon Nacos Client”实现服务发现的负载均衡;
(6)用CP模式和AP模式来保持注册中心的数据一致性;
(7)用缓存和文件来存储Nacos的元数据;
(8)用Nacos Sync来实现应用服务的数据迁移。
1.2.2 Spring Cloud Alibaba服务订阅负载均衡的核心原理
Spring Cloud Alibaba定义了一个加载负载均衡规则的类NacosRule,它继承了ribbon-loadbalancer项目中的AbstractLoadBalancerRule类,具体如下所示:
代码语言:javascript复制public class NacosRule extends AbstractLoadBalancerRule {
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Autowired
private NacosServiceManager nacosServiceManager;
@Override
public Server choose(Object key) {
try {
//获取Nacos的集群名称
String clusterName = this.nacosDiscoveryProperties.getClusterName();
//获取Group的名称
String group = this.nacosDiscoveryProperties.getGroup();
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
String name = loadBalancer.getName();
//实例化一个Nacos Client的服务注册中心的名称服务对象NamingService
NamingService namingService = nacosServiceManager
.getNamingService(nacosDiscoveryProperties.getNacosProperties());
//获取指定服务名称的所有健康的服务实例信息
List<Instance> instances = namingService.selectInstances(name, group, true);
if (CollectionUtils.isEmpty(instances)) {
LOGGER.warn("no instance in service {}", name);
return null;
}
...
//使用负载均衡算法,均衡的选举一个服务实例,并返回一个NacosServer对象,完成负载均衡
Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
return new NacosServer(instance);
}
catch (Exception e) {
LOGGER.warn("NacosRule error", e);
return null;
}
}
}
Spring Cloud Alibaba复用了Nacos提供的服务负载均衡算法,当然开发人员可以自己实现一个负载均衡算法。Nacos的服务负载均衡算法如下所示。
代码语言:javascript复制public class Balancer {
//按照随机权重,进行服务的负载均衡
protected static Instance getHostByRandomWeight(List<Instance> hosts) {
NAMING_LOGGER.debug("entry randomWithWeight");
if (hosts == null || hosts.size() == 0) {
NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
return null;
}
NAMING_LOGGER.debug("new Chooser");
List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
//过滤掉不健康的服务实例
for (Instance host : hosts) {
if (host.isHealthy()) {
hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
}
}
NAMING_LOGGER.debug("for (Host host : hosts)");
Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
//刷新服务实例的权重信息,这些权重信息可以通过Nacos的UI控制台,或者Open API动态的修改,并实时的生效
vipChooser.refresh(hostsWithWeight);
NAMING_LOGGER.debug("vipChooser.refresh");
//执行负载均衡算法
return vipChooser.randomWithWeight();
}
...
}
//负载均衡算法选择器
public class Chooser<K, T> {
public T randomWithWeight() {
Ref<T> ref = this.ref;
//产生随机种子
double random = ThreadLocalRandom.current().nextDouble(0, 1);
//采用二分查找,获取下标编号
int index = Arrays.binarySearch(ref.weights, random);
if (index < 0) {
index = -index - 1;
} else {
return ref.items.get(index);
}
if (index >= 0 && index < ref.weights.length) {
if (random < ref.weights[index]) {
return ref.items.get(index);
}
}
return ref.items.get(ref.items.size() - 1);
}
}
关于Spring Cloud Alibaba和Nacos的分布式服务治理的相关原理,可以阅读本书相关的章节。
1.3 分布式配置管理——基于Nacos
1.3.1 主要内容
(1)认识分布式配置管理;
(2)了解主流的配置中心;
(3)将应用接入Nacos配置中心;
(4)用HTTP协议和gRPC框架实现通信渠道;
(5)用“Sofa-Jraft Apache Derby”保证配置中心的数据一致性;
(6)用数据库持久化配置中心的数据;
(7)用“Spring Cloud Alibaba Config Nacos Config”实现配置管理(公共配置、应用配置和扩展配置)。
1.3.2 基于Spring Cloud Alibaba的配置信息动态变更的核心原理
首先,开发者在本地配置文件中,开启动态配置,如下所示。
代码语言:javascript复制###默认为true
spring.cloud.nacos.config.refreshEnabled=true
其次,初始化一个配置信息的上下文刷新类NacosContextRefresher,如下所示。
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {
//利用Spring Boot的自动配置原理,初始化NacosContextRefresher对象,并托管到Spring Framework的IOC容器中
@Bean
public NacosContextRefresher nacosContextRefresher(
NacosConfigManager nacosConfigManager,
NacosRefreshHistory nacosRefreshHistory) {
return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}
...
}
最后,用Nacos Client,向Nacos的配置中心注册一个监听器,如下所示。
代码语言:javascript复制public class NacosContextRefresher
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
private AtomicBoolean ready = new AtomicBoolean(false);
//用Spring FrameWork的事件机制,自动触发添加Nacos配置信息监听器的事件
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
//防止应用使用多个Spring Context(多个IOC容器)
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
//注册Nacos监听器
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
//注意监听器注册的维度是dataId,也就是说,如果应用中存在多个属性文件,就会注册多个对应的监听器
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
//执行注册监听器
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
//注册Nacos Client的监听器AbstractSharedListener
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
//调用Nacos Client的NacosConfigService,向Nacos配置中心注册一个监听器
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
}
关于Nacos配置中心监听器的原理,可以阅读本书的相关章节。
1.4 分布式系统的高可用流量防护——基于Sentinel
1.4.1 主要内容
(1)认识分布式流量防护;
(2)认识Sentinel;
(3)将应用接入Sentinel;
(4)用HTTP或者Netty实现通信渠道;
(5)用过滤器和拦截器实现组件的适配;
(6)用“流量控制”实现流量防护;
(7)用“熔断降级”实现流量防护;
(8)用“系统自适应保护”实现流量防护;
(9)用Nacos实现规则的动态配置和持久化 。
1.4.2 基于Spring Cloud Alibaba,动态加载和持久化高可用流量防护规则的原理
首先,初始化一个数据源处理器SentinelDataSourceHandler类,如下所示。
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public SentinelDataSourceHandler sentinelDataSourceHandler(
DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
Environment env) {
//实例化一个SentinelDataSourceHandler对象
return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
}
...
}
其次,利用Spring FrameWork的SmartInitializingSingleton类,在Bean工厂初始化之前,初始化持久化数据源,具体如下所示。
代码语言:javascript复制public class SentinelDataSourceHandler implements SmartInitializingSingleton {
...
@Override
public void afterSingletonsInstantiated() {
sentinelProperties.getDatasource()
.forEach((dataSourceName, dataSourceProperties) -> {
try {
...
//定义一个数据源属性类AbstractDataSourceProperties
AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties
.getValidDataSourceProperties();
abstractDataSourceProperties.setEnv(env);
abstractDataSourceProperties.preCheck(dataSourceName);
registerBean(abstractDataSourceProperties, dataSourceName
"-sentinel-" validFields.get(0) "-datasource");
}
catch (Exception e) {
log.error("[Sentinel Starter] DataSource " dataSourceName
" build error: " e.getMessage(), e);
}
});
}
private void registerBean(final AbstractDataSourceProperties dataSourceProperties,
String dataSourceName) {
...
this.beanFactory.registerBeanDefinition(dataSourceName,
builder.getBeanDefinition());
//初始化流量防护规则的数据源
AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory
.getBean(dataSourceName);
//将数据源的属性规则,注册到Sentinel中
dataSourceProperties.postRegister(newDataSource);
}
}
最后,动态的将流量防护规则注册到Sentinel中,具体如下所示。
代码语言:javascript复制public class AbstractDataSourceProperties {
...
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
//注册流控规则
case FLOW:
FlowRuleManager.register2Property(dataSource.getProperty());
break;
//注册降级规则
case DEGRADE:
DegradeRuleManager.register2Property(dataSource.getProperty());
break;
//注册基于参数的流控规则
case PARAM_FLOW:
ParamFlowRuleManager.register2Property(dataSource.getProperty());
break;
//注册系统自适应规则
case SYSTEM:
SystemRuleManager.register2Property(dataSource.getProperty());
break;
//注册鉴权规则
case AUTHORITY:
AuthorityRuleManager.register2Property(dataSource.getProperty());
break;
//注册网关流控规则
case GW_FLOW:
GatewayRuleManager.register2Property(dataSource.getProperty());
break;
//注册网关API定义规则
case GW_API_GROUP:
GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
break;
default:
break;
}
}
}
关于Spring Cloud Alibaba与Sentinel的相关原理,可以查阅本书相关章节。
1.5 高性能的分布式事务框架——Seata
1.5.1 主要内容
(1)认识分布式事务;
(2)认识Seata;
(3)将应用接入Seata;
(4)用Netty实现客户端与服务器端之间的通信渠道;
(5)用拦截器和过滤器适配主流的RPC框架;
(6)用AT模式实现分布式事务;
(7)用TCC模式实现分布式事务;
(8)用XA模式实现分布式事务;
(9)用Saga模式实现分布式事务。
1.5.2 Seata Server启动原理
首先,Seata使用Shell脚本seata-server.sh,启动io.seata.server.Server,具体如下所示。
代码语言:javascript复制public class Server {
public static void main(String[] args) throws IOException {
...
ParameterParser parameterParser = new ParameterParser(args);
MetricsManager.get().init();
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
//初始化一个RPC对象NettyRemotingServer(基于Netty)
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
nettyRemotingServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());
SessionHolder.init(parameterParser.getStoreMode());
//初始化一个处理全局事务的对象DefaultCoordinator,比如开启全局事务、提交全局事务和回滚全局事务等
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());
try {
//初始化Netty服务端完成之后,并启动服务端(启动NettyServerBootstrap),等待客户端连接
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}
System.exit(0);
}
}
其次,注册事件处理器,主要用于处理客户端的通信消息事件,比如开启全局事务、提交全局事务等,具体如下所示。
代码语言:javascript复制public class NettyRemotingServer extends AbstractNettyRemotingServer {
@Override
public void init() {
//注册处理客户端消息事件的处理器,每种类型的事件一个处理器
registerProcessor();
}
private void registerProcessor() {
// 1. 注册请求消息处理器
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. 注册响应消息处理器
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. 注册rm消息处理器
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. 注册tm消息处理器
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. 注册心跳消息处理器
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
}
关于Spring Cloud Alibaba与Seata相关的原理,可以查阅本书的相关章节。
1.6 高可靠性分布式消息中间件RocketMQ
1.6.1 主要内容
(1)消息中间件概述;
(2)搭建RocketMQ的运行环境;
(3)将应用接入RocketMQ;
(4)用Netty实现RocketMQ的通信渠道;
(5)用“异步”“同步”和“最多发送一次”模式生产消息;
(6)用Push和Pull模式实现消息的消费;
(7)用两阶段提交和定时回查事务状态实现事务消息。
1.6.2 Spring Cloud Alibaba是如何封装RocketMQ的?
首先,使用RocketMQListenerBindingContainer类,初始化一个消费者,具体代码如下所示。
代码语言:javascript复制public class RocketMQListenerBindingContainer
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
}
//初始化一个消费者DefaultMQPushConsumer
private void initRocketMQPushConsumer() throws MQClientException {
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
String ak = rocketBinderConfigurationProperties.getAccessKey();
String sk = rocketBinderConfigurationProperties.getSecretKey();
//Spring Cloud Alibaba默认支持Push模式
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
new AllocateMessageQueueAveragely(),
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
topic "|" UtilAll.getPid()));
consumer.setVipChannelEnabled(false);
}
else {
consumer = new DefaultMQPushConsumer(consumerGroup,
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
}
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
//广播和集群模式
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
//过滤模式
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
//消费类型:顺序和并行
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
.prepareStart(consumer);
}
}
...
}
其次,在RocketMQInboundChannelAdapter类,开启消费者,开始消费消息,具体如下所示。
代码语言:javascript复制public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
...
@Override
protected void doStart() {
if (consumerProperties == null
|| !consumerProperties.getExtension().getEnabled()) {
return;
}
try {
//开启消费者,开始消费消息
rocketMQListenerContainer.start();
instrumentationManager
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
rocketMQListenerContainer.getConsumerGroup())
.markStartedSuccessfully();
}
catch (Exception e) {
instrumentationManager
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
rocketMQListenerContainer.getConsumerGroup())
.markStartFailed(e);
log.error("RocketMQTemplate startup failed, Caused by " e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " e.getMessage())
.build(), e);
}
}
}
public class RocketMQListenerBindingContainer
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
...
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException(
"container already running. " this.toString());
}
try {
//调用消费者,开始消费消息
consumer.start();
}
catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
}
}
关于Spring Cloud Alibaba与RocketMQ的相关原理,可以查阅本书的相关章节。
1.7 高可靠性分布式消息中间件RocketMQ
1.7.1 主要内容
(1)认识网关;
(2)用Reactor Netty实现 Spring Cloud Gateway的通信渠道;
(3)用“路由规则定位器”(RouteDefinitionLocator)加载网关的路由规则;
(4)用“Redis Lua”进行网关API的限流。
1.7.2 Spring Cloud Gateway如何整合Redis,做网关限流
首先,Spring Cloud Gateway整合了spring-data-redis,并利用Spring Boot的自动配置,初始化Redis客户端,具体如下所示。
代码语言:javascript复制@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass({ RedisTemplate.class, DispatcherHandler.class })
class GatewayRedisAutoConfiguration {
//整合Lua脚本
@Bean
@SuppressWarnings("unchecked")
public RedisScript redisRequestRateLimiterScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(
new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
//构造“基于Redis的分布式限流器”
@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
ConfigurationService configurationService) {
return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
}
}
其次,用分布式限流器进行限流,具体如下所示。
代码语言:javascript复制@ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter")
public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config>
implements ApplicationContextAware {
//结合Redis Lua,使用令牌桶算法完成分布式限流
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
//加载路由配置信息
Config routeConfig = loadConfiguration(routeId);
int replenishRate = routeConfig.getReplenishRate();
//获取桶的容量
int burstCapacity = routeConfig.getBurstCapacity();
//获取请求Token数
int requestedTokens = routeConfig.getRequestedTokens();
try {
List<String> keys = getKeys(id);
List<String> scriptArgs = Arrays.asList(replenishRate "",
burstCapacity "", Instant.now().getEpochSecond() "",
requestedTokens "");
//用Redis客户端执行Lua限流脚本
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
scriptArgs);
return flux.onErrorResume(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error calling rate limiter lua", throwable);
}
return Flux.just(Arrays.asList(1L, -1L));
}).reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed,getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " response);
}
return response;
});
}
catch (Exception e) {
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
}
如果想了解Spring Cloud Alibaba与Spring Cloud Gateway的详细原理,读者可以查阅本书的相关章节。
总结
本文详细介绍了——Spring Cloud Alibaba微服务架构实战派(上下册)中上册的核心内容及相关原理