技术硬实力,聊聊写Spring Cloud Alibaba实战派这本书的初衷

2022-09-23 18:12:59 浏览数 (1)

笔者也是机缘巧合,才会开启自己的写书之路。

在写这本书之前,我先后在两家杭州的“独角兽”公司担任技术负责人,并推进公司核心业务的“中台化”改造。在落地业务中台和技术中台的过程中,督促并指导开发人员统一使用Spring Cloud Alibaba作为中台服务最底层的基础框架。为了快速推进业务服务Spring Cloud Alibaba化的进度,我冲在业务的第一线,收集和整理开发人员在使用Spring Cloud Alibaba过程中反馈的技术问题,并提供有效的技术解决方案,直至项目落地。

我每周都会做技术复盘,通过分析大量的问题总结出一个结论:开发人员反馈的问题大部分都是由于Spring Cloud Alibaba使用不合理所造成的。也就是说,很多开发人员并不了解Spring Cloud Alibaba的原理及如何落地实践。于是,我就产生了把我这几年落地Spring Cloud Alibaba的经验通过图书的方式输出的想法。

回到主题,我们来聊一聊Spring Cloud Alibaba微服务架构实战派上下册书籍。

本书上册核心内容

1.1 Spring Cloud Alibaba基础实战

1.1.1 主要内容

(1)Spring Cloud Alibaba“牛刀小试”,包括:使用Spring Cloud Alibaba作为基础框架实现乐观锁、实现多数据源和实现SQL语句中表名的动态替换;

(2)【实例】用Maven和Spring Cloud Alibaba实现多环境部署,学习完本章内容,读者可以快速的使用配套源码,搭建可扩展的多环境运维部署环境;

(3)【实例】用“MyBatis-Plus Spring Cloud Alibaba”实现多租户架构,学习完本章内容,读者可以快速的使用配套源码,实现微服务架构中的多租户架构。

1.1.2 MyBatis-Plus实现多租户架构的核心原理

熟悉Mybatis原理的开发应该都知道它的拦截器机制,Mybatis会使用注解@Intercepts去标注一个拦截器,并在Mybatis框架启动的过程中,扫描当前Spring IOC容器中被注解@Intercepts标记的拦截器。

第一步:MyBatis-Plus定义一个全局拦截器MybatisPlusInterceptor类,如下所示。

代码语言:javascript复制
//通过注解@Intercepts,将MyBatis-Plus和Mybatis绑定在一起
@Intercepts(
    {
        @Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class}),
        @Signature(type = StatementHandler.class, method = "getBoundSql", args = {}),
        @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
        @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
        @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
    }
)
public class MybatisPlusInterceptor implements Interceptor {
    @Setter
    private List<InnerInterceptor> interceptors = new ArrayList<>();
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
      //遍历内部拦截器列表,并执行InnerInterceptor.beforeUpdate()
    }
}

第二步:MyBatis-Plus定义一个内部多租户拦截TenantLineInnerInterceptor类,如下所示。

代码语言:javascript复制
public class TenantLineInnerInterceptor extends JsqlParserSupport implements InnerInterceptor {
    private TenantLineHandler tenantLineHandler;
    //使用代理和反射,生成一个租户处理器TenantLineHandler
    @Override
    public void setProperties(Properties properties) {
        PropertyMapper.newInstance(properties)
            .whenNotBlack("tenantLineHandler", ClassUtils::newInstance, this::setTenantLineHandler);
    }
    ...
}

1.2 分布式服务治理——基于Nacos

1.2.1 主要内容

(1)认识分布式服务治理;

(2)了解主流的注册中心;

(3)将应用接入Nacos 注册中心;

(4)用“NacosNamingService类 @EnableDiscoveryClient”实现服务的注册/订阅;

(5)用“Ribbon Nacos Client”实现服务发现的负载均衡;

(6)用CP模式和AP模式来保持注册中心的数据一致性;

(7)用缓存和文件来存储Nacos的元数据;

(8)用Nacos Sync来实现应用服务的数据迁移。

1.2.2 Spring Cloud Alibaba服务订阅负载均衡的核心原理

Spring Cloud Alibaba定义了一个加载负载均衡规则的类NacosRule,它继承了ribbon-loadbalancer项目中的AbstractLoadBalancerRule类,具体如下所示:

代码语言:javascript复制
public class NacosRule extends AbstractLoadBalancerRule {
  @Autowired
  private NacosDiscoveryProperties nacosDiscoveryProperties;
  @Autowired
  private NacosServiceManager nacosServiceManager;
  @Override
  public Server choose(Object key) {
    try {
      //获取Nacos的集群名称
      String clusterName = this.nacosDiscoveryProperties.getClusterName();
      //获取Group的名称
      String group = this.nacosDiscoveryProperties.getGroup();
      DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) getLoadBalancer();
      String name = loadBalancer.getName();
      //实例化一个Nacos Client的服务注册中心的名称服务对象NamingService
      NamingService namingService = nacosServiceManager
          .getNamingService(nacosDiscoveryProperties.getNacosProperties());
      //获取指定服务名称的所有健康的服务实例信息
      List<Instance> instances = namingService.selectInstances(name, group, true);
      if (CollectionUtils.isEmpty(instances)) {
        LOGGER.warn("no instance in service {}", name);
        return null;
      }
      ...
      //使用负载均衡算法,均衡的选举一个服务实例,并返回一个NacosServer对象,完成负载均衡
      Instance instance = ExtendBalancer.getHostByRandomWeight2(instancesToChoose);
      return new NacosServer(instance);
    }
    catch (Exception e) {
      LOGGER.warn("NacosRule error", e);
      return null;
    }
  }
}

Spring Cloud Alibaba复用了Nacos提供的服务负载均衡算法,当然开发人员可以自己实现一个负载均衡算法。Nacos的服务负载均衡算法如下所示。

代码语言:javascript复制
public class Balancer {
  //按照随机权重,进行服务的负载均衡
  protected static Instance getHostByRandomWeight(List<Instance> hosts) {
        NAMING_LOGGER.debug("entry randomWithWeight");
        if (hosts == null || hosts.size() == 0) {
            NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
            return null;
        }
        NAMING_LOGGER.debug("new Chooser");
        List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
        //过滤掉不健康的服务实例
        for (Instance host : hosts) {
            if (host.isHealthy()) {
                hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
            }
        }
        NAMING_LOGGER.debug("for (Host host : hosts)");
        Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
        //刷新服务实例的权重信息,这些权重信息可以通过Nacos的UI控制台,或者Open API动态的修改,并实时的生效
        vipChooser.refresh(hostsWithWeight);
        NAMING_LOGGER.debug("vipChooser.refresh");
        //执行负载均衡算法
        return vipChooser.randomWithWeight();
    }
  ...
}
//负载均衡算法选择器
public class Chooser<K, T> {
  public T randomWithWeight() {
        Ref<T> ref = this.ref;
        //产生随机种子
        double random = ThreadLocalRandom.current().nextDouble(0, 1);
        //采用二分查找,获取下标编号
        int index = Arrays.binarySearch(ref.weights, random);
        if (index < 0) {
            index = -index - 1;
        } else {
            return ref.items.get(index);
        }
        if (index >= 0 && index < ref.weights.length) {
            if (random < ref.weights[index]) {
                return ref.items.get(index);
            }
        }
        return ref.items.get(ref.items.size() - 1);
    }
}

关于Spring Cloud Alibaba和Nacos的分布式服务治理的相关原理,可以阅读本书相关的章节。

1.3 分布式配置管理——基于Nacos

1.3.1 主要内容

(1)认识分布式配置管理;

(2)了解主流的配置中心;

(3)将应用接入Nacos配置中心;

(4)用HTTP协议和gRPC框架实现通信渠道;

(5)用“Sofa-Jraft Apache Derby”保证配置中心的数据一致性;

(6)用数据库持久化配置中心的数据;

(7)用“Spring Cloud Alibaba Config Nacos Config”实现配置管理(公共配置、应用配置和扩展配置)。

1.3.2 基于Spring Cloud Alibaba的配置信息动态变更的核心原理

首先,开发者在本地配置文件中,开启动态配置,如下所示。

代码语言:javascript复制
###默认为true
spring.cloud.nacos.config.refreshEnabled=true

其次,初始化一个配置信息的上下文刷新类NacosContextRefresher,如下所示。

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {
  //利用Spring Boot的自动配置原理,初始化NacosContextRefresher对象,并托管到Spring Framework的IOC容器中
  @Bean
  public NacosContextRefresher nacosContextRefresher(
      NacosConfigManager nacosConfigManager,
      NacosRefreshHistory nacosRefreshHistory) {
    return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
  }
  ...
}

最后,用Nacos Client,向Nacos的配置中心注册一个监听器,如下所示。

代码语言:javascript复制
public class NacosContextRefresher
    implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
  private AtomicBoolean ready = new AtomicBoolean(false);
  //用Spring FrameWork的事件机制,自动触发添加Nacos配置信息监听器的事件
  @Override
  public void onApplicationEvent(ApplicationReadyEvent event) {
    //防止应用使用多个Spring Context(多个IOC容器)
    if (this.ready.compareAndSet(false, true)) {
      this.registerNacosListenersForApplications();
    }
  }
  //注册Nacos监听器
  private void registerNacosListenersForApplications() {
    if (isRefreshEnabled()) {
      for (NacosPropertySource propertySource : NacosPropertySourceRepository
          .getAll()) {
        if (!propertySource.isRefreshable()) {
          continue;
        }
        String dataId = propertySource.getDataId();
        //注意监听器注册的维度是dataId,也就是说,如果应用中存在多个属性文件,就会注册多个对应的监听器
        registerNacosListener(propertySource.getGroup(), dataId);
      }
    }
  }
  //执行注册监听器
  private void registerNacosListener(final String groupKey, final String dataKey) {
    String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    //注册Nacos Client的监听器AbstractSharedListener
    Listener listener = listenerMap.computeIfAbsent(key,
        lst -> new AbstractSharedListener() {
          @Override
          public void innerReceive(String dataId, String group,
              String configInfo) {
            refreshCountIncrement();
            nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
            // todo feature: support single refresh for listening
            applicationContext.publishEvent(
                new RefreshEvent(this, null, "Refresh Nacos config"));
            if (log.isDebugEnabled()) {
              log.debug(String.format(
                  "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                  group, dataId, configInfo));
            }
          }
        });
    try {
      //调用Nacos Client的NacosConfigService,向Nacos配置中心注册一个监听器
      configService.addListener(dataKey, groupKey, listener);
    }
    catch (NacosException e) {
      log.warn(String.format(
          "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
          groupKey), e);
    }
  }
}

关于Nacos配置中心监听器的原理,可以阅读本书的相关章节。

1.4 分布式系统的高可用流量防护——基于Sentinel

1.4.1 主要内容

(1)认识分布式流量防护;

(2)认识Sentinel;

(3)将应用接入Sentinel;

(4)用HTTP或者Netty实现通信渠道;

(5)用过滤器和拦截器实现组件的适配;

(6)用“流量控制”实现流量防护;

(7)用“熔断降级”实现流量防护;

(8)用“系统自适应保护”实现流量防护;

(9)用Nacos实现规则的动态配置和持久化 。

1.4.2 基于Spring Cloud Alibaba,动态加载和持久化高可用流量防护规则的原理

首先,初始化一个数据源处理器SentinelDataSourceHandler类,如下所示。

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
  @Bean
  @ConditionalOnMissingBean
  public SentinelDataSourceHandler sentinelDataSourceHandler(
      DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
      Environment env) {
    //实例化一个SentinelDataSourceHandler对象
    return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
  }
  ...
}

其次,利用Spring FrameWork的SmartInitializingSingleton类,在Bean工厂初始化之前,初始化持久化数据源,具体如下所示。

代码语言:javascript复制
public class SentinelDataSourceHandler implements SmartInitializingSingleton {
  ...
  @Override
  public void afterSingletonsInstantiated() {
    sentinelProperties.getDatasource()
        .forEach((dataSourceName, dataSourceProperties) -> {
          try {
            ...
            //定义一个数据源属性类AbstractDataSourceProperties
            AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties
                .getValidDataSourceProperties();
            abstractDataSourceProperties.setEnv(env);
            abstractDataSourceProperties.preCheck(dataSourceName);
            registerBean(abstractDataSourceProperties, dataSourceName
                  "-sentinel-"   validFields.get(0)   "-datasource");
          }
          catch (Exception e) {
            log.error("[Sentinel Starter] DataSource "   dataSourceName
                  " build error: "   e.getMessage(), e);
          }
        });
  }
  
  private void registerBean(final AbstractDataSourceProperties dataSourceProperties,
      String dataSourceName) {
      ...
      this.beanFactory.registerBeanDefinition(dataSourceName,
        builder.getBeanDefinition());
      //初始化流量防护规则的数据源
      AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory
        .getBean(dataSourceName);
      //将数据源的属性规则,注册到Sentinel中
      dataSourceProperties.postRegister(newDataSource);
  }
}

最后,动态的将流量防护规则注册到Sentinel中,具体如下所示。

代码语言:javascript复制
public class AbstractDataSourceProperties {
  ...
  public void postRegister(AbstractDataSource dataSource) {
    switch (this.getRuleType()) {
    //注册流控规则
    case FLOW:
      FlowRuleManager.register2Property(dataSource.getProperty());
      break;
    //注册降级规则
    case DEGRADE:
      DegradeRuleManager.register2Property(dataSource.getProperty());
      break;
    //注册基于参数的流控规则
    case PARAM_FLOW:
      ParamFlowRuleManager.register2Property(dataSource.getProperty());
      break;
    //注册系统自适应规则
    case SYSTEM:
      SystemRuleManager.register2Property(dataSource.getProperty());
      break;
    //注册鉴权规则
    case AUTHORITY:
      AuthorityRuleManager.register2Property(dataSource.getProperty());
      break;
     //注册网关流控规则
    case GW_FLOW:
      GatewayRuleManager.register2Property(dataSource.getProperty());
      break;
    //注册网关API定义规则
    case GW_API_GROUP:
      GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
      break;
    default:
      break;
    }
  }
}

关于Spring Cloud Alibaba与Sentinel的相关原理,可以查阅本书相关章节。

1.5 高性能的分布式事务框架——Seata

1.5.1 主要内容

(1)认识分布式事务;

(2)认识Seata;

(3)将应用接入Seata;

(4)用Netty实现客户端与服务器端之间的通信渠道;

(5)用拦截器和过滤器适配主流的RPC框架;

(6)用AT模式实现分布式事务;

(7)用TCC模式实现分布式事务;

(8)用XA模式实现分布式事务;

(9)用Saga模式实现分布式事务。

1.5.2 Seata Server启动原理

首先,Seata使用Shell脚本seata-server.sh,启动io.seata.server.Server,具体如下所示。

代码语言:javascript复制
public class Server {
  public static void main(String[] args) throws IOException {
        ...
        ParameterParser parameterParser = new ParameterParser(args);
        MetricsManager.get().init();
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
        //初始化一个RPC对象NettyRemotingServer(基于Netty)
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
        nettyRemotingServer.setListenPort(parameterParser.getPort());
        UUIDGenerator.init(parameterParser.getServerNode());
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化一个处理全局事务的对象DefaultCoordinator,比如开启全局事务、提交全局事务和回滚全局事务等
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();
        nettyRemotingServer.setHandler(coordinator);
        ShutdownHook.getInstance().addDisposable(coordinator);
        ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            XID.setIpAddress(NetUtil.getLocalIp());
        }
        XID.setPort(nettyRemotingServer.getListenPort());
        try {
            //初始化Netty服务端完成之后,并启动服务端(启动NettyServerBootstrap),等待客户端连接
            nettyRemotingServer.init();
        } catch (Throwable e) {
            logger.error("nettyServer init error:{}", e.getMessage(), e);
            System.exit(-1);
        }
        System.exit(0);
    }
}    

其次,注册事件处理器,主要用于处理客户端的通信消息事件,比如开启全局事务、提交全局事务等,具体如下所示。

代码语言:javascript复制
public class NettyRemotingServer extends AbstractNettyRemotingServer {
  @Override
  public void init() {
    //注册处理客户端消息事件的处理器,每种类型的事件一个处理器
    registerProcessor();
  }
  private void registerProcessor() {
        // 1. 注册请求消息处理器
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2. 注册响应消息处理器
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
        // 3. 注册rm消息处理器
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4. 注册tm消息处理器
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5. 注册心跳消息处理器
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }
}

关于Spring Cloud Alibaba与Seata相关的原理,可以查阅本书的相关章节。

1.6 高可靠性分布式消息中间件RocketMQ

1.6.1 主要内容

(1)消息中间件概述;

(2)搭建RocketMQ的运行环境;

(3)将应用接入RocketMQ;

(4)用Netty实现RocketMQ的通信渠道;

(5)用“异步”“同步”和“最多发送一次”模式生产消息;

(6)用Push和Pull模式实现消息的消费;

(7)用两阶段提交和定时回查事务状态实现事务消息。

1.6.2 Spring Cloud Alibaba是如何封装RocketMQ的?

首先,使用RocketMQListenerBindingContainer类,初始化一个消费者,具体代码如下所示。

代码语言:javascript复制
public class RocketMQListenerBindingContainer
    implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
  @Override
  public void afterPropertiesSet() throws Exception {
    initRocketMQPushConsumer();
  } 
  //初始化一个消费者DefaultMQPushConsumer
  private void initRocketMQPushConsumer() throws MQClientException {
    Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
    Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
    Assert.notNull(nameServer, "Property 'nameServer' is required");
    Assert.notNull(topic, "Property 'topic' is required");
    String ak = rocketBinderConfigurationProperties.getAccessKey();
    String sk = rocketBinderConfigurationProperties.getSecretKey();
    //Spring Cloud Alibaba默认支持Push模式
    if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
      RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
      consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
          new AllocateMessageQueueAveragely(),
          rocketBinderConfigurationProperties.isEnableMsgTrace(),
          rocketBinderConfigurationProperties.getCustomizedTraceTopic());
      consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
          topic   "|"   UtilAll.getPid()));
      consumer.setVipChannelEnabled(false);
    }
    else {
      consumer = new DefaultMQPushConsumer(consumerGroup,
          rocketBinderConfigurationProperties.isEnableMsgTrace(),
          rocketBinderConfigurationProperties.getCustomizedTraceTopic());
    }
    consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
    consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
    consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
    //广播和集群模式
    switch (messageModel) {
    case BROADCASTING:
      consumer.setMessageModel(
          org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
      break;
    case CLUSTERING:
      consumer.setMessageModel(
          org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
      break;
    default:
      throw new IllegalArgumentException("Property 'messageModel' was wrong.");
    }
    //过滤模式
    switch (selectorType) {
    case TAG:
      consumer.subscribe(topic, selectorExpression);
      break;
    case SQL92:
      consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
      break;
    default:
      throw new IllegalArgumentException("Property 'selectorType' was wrong.");
    }
    //消费类型:顺序和并行
    switch (consumeMode) {
    case ORDERLY:
      consumer.setMessageListener(new DefaultMessageListenerOrderly());
      break;
    case CONCURRENTLY:
      consumer.setMessageListener(new DefaultMessageListenerConcurrently());
      break;
    default:
      throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
    }
    if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
      ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
          .prepareStart(consumer);
    }
  }
  ...
}

其次,在RocketMQInboundChannelAdapter类,开启消费者,开始消费消息,具体如下所示。

代码语言:javascript复制
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
  ...
  @Override
  protected void doStart() {
    if (consumerProperties == null
        || !consumerProperties.getExtension().getEnabled()) {
      return;
    }
    try {
      //开启消费者,开始消费消息
      rocketMQListenerContainer.start();
      instrumentationManager
          .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
                rocketMQListenerContainer.getConsumerGroup())
          .markStartedSuccessfully();
    }
    catch (Exception e) {
      instrumentationManager
          .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
                rocketMQListenerContainer.getConsumerGroup())
          .markStartFailed(e);
      log.error("RocketMQTemplate startup failed, Caused by "   e.getMessage());
      throw new MessagingException(MessageBuilder.withPayload(
          "RocketMQTemplate startup failed, Caused by "   e.getMessage())
          .build(), e);
    }
  }
}
public class RocketMQListenerBindingContainer
    implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
  ...
  @Override
  public void start() {
    if (this.isRunning()) {
      throw new IllegalStateException(
          "container already running. "   this.toString());
    }
    try {
      //调用消费者,开始消费消息
      consumer.start();
    }
    catch (MQClientException e) {
      throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
    }
    this.setRunning(true);
  }
}

关于Spring Cloud Alibaba与RocketMQ的相关原理,可以查阅本书的相关章节。

1.7 高可靠性分布式消息中间件RocketMQ

1.7.1 主要内容

(1)认识网关;

(2)用Reactor Netty实现 Spring Cloud Gateway的通信渠道;

(3)用“路由规则定位器”(RouteDefinitionLocator)加载网关的路由规则;

(4)用“Redis Lua”进行网关API的限流。

1.7.2 Spring Cloud Gateway如何整合Redis,做网关限流

首先,Spring Cloud Gateway整合了spring-data-redis,并利用Spring Boot的自动配置,初始化Redis客户端,具体如下所示。

代码语言:javascript复制
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass({ RedisTemplate.class, DispatcherHandler.class })
class GatewayRedisAutoConfiguration {
  //整合Lua脚本
  @Bean
  @SuppressWarnings("unchecked")
  public RedisScript redisRequestRateLimiterScript() {
    DefaultRedisScript redisScript = new DefaultRedisScript<>();
    redisScript.setScriptSource(new ResourceScriptSource(
        new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
    redisScript.setResultType(List.class);
    return redisScript;
  }
  //构造“基于Redis的分布式限流器”
  @Bean
  @ConditionalOnMissingBean
  public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
      @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
      ConfigurationService configurationService) {
    return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
  }

}

其次,用分布式限流器进行限流,具体如下所示。

代码语言:javascript复制
@ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter")
public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config>
    implements ApplicationContextAware {
    //结合Redis Lua,使用令牌桶算法完成分布式限流
    public Mono<Response> isAllowed(String routeId, String id) {
    if (!this.initialized.get()) {
      throw new IllegalStateException("RedisRateLimiter is not initialized");
    }
    //加载路由配置信息
    Config routeConfig = loadConfiguration(routeId);
    int replenishRate = routeConfig.getReplenishRate();
    //获取桶的容量
    int burstCapacity = routeConfig.getBurstCapacity();
    //获取请求Token数
    int requestedTokens = routeConfig.getRequestedTokens();
    try {
      List<String> keys = getKeys(id);
      List<String> scriptArgs = Arrays.asList(replenishRate   "",
          burstCapacity   "", Instant.now().getEpochSecond()   "",
          requestedTokens   "");
      //用Redis客户端执行Lua限流脚本
      Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
          scriptArgs);
      return flux.onErrorResume(throwable -> {
        if (log.isDebugEnabled()) {
          log.debug("Error calling rate limiter lua", throwable);
        }
        return Flux.just(Arrays.asList(1L, -1L));
      }).reduce(new ArrayList<Long>(), (longs, l) -> {
        longs.addAll(l);
        return longs;
      }).map(results -> {
        boolean allowed = results.get(0) == 1L;
        Long tokensLeft = results.get(1);
        Response response = new Response(allowed,getHeaders(routeConfig, tokensLeft));
        if (log.isDebugEnabled()) {
          log.debug("response: "   response);
        }
        return response;
      });
    }
    catch (Exception e) {
    }
    return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
  }
}

如果想了解Spring Cloud Alibaba与Spring Cloud Gateway的详细原理,读者可以查阅本书的相关章节。

总结

本文详细介绍了——Spring Cloud Alibaba微服务架构实战派(上下册)中上册的核心内容及相关原理

0 人点赞