RocketMQ-NameServer原理

2023-10-20 10:24:17 浏览数 (2)

NameServer 名字服务

实际作就是就一个注册中心

NameServer 作用

在系统中肯定是做命名服务,服务治理方面的工作,功能应该是和zookeeper差不多 早期的版本中,使用的是 Zookeeper 做为配置中心,改名 RocketMQ 后使用了自己开发的 NameServer。 是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步

两个主要做用

  1. NameServer维护Broker NameServer 维护了一份 Broker 的地址列表和 Broker 在启动的时候会去 NameServer 进行注册,会维护 Broker 的存活状态。
  2. NameServer 维护Topic NameServer 维护了一份 Topic 和 Topic 对应队列的地址列表,Broker 每次发送的心跳过来的时候会把 Topic 信息带上。

producer、consumer 发送消息会去 NameServer 去拉取路由信息

NameServer 维护 Broker

1.维护 Broker 信息 broker 启动后,会连接到 NameServer,定期上报自身信息,NameServer 收到消息后会每 30秒 扫描一次所有已上报的 Broker 信息的心跳。 NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。 注意: 但是路由变化不会马上通知消息生产者Producer。 这样设计的目的是为了降低 NameServer 实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。

2.判断 broker 失效 以 NameServer自身 broker列表中的broker的更新时间,当前时间与最后更新时间差值超过2分钟,就判定为失效,移除失效 broker。这个后面带上源码分析。

3.无状态性 NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是NameServer设计的一个亮点

特点:

  1. 互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用,这点类似于dubbo的zookeeper。
  2. nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

Broker 启动的时候会将自己注册到 NameServer 中,注册的同时还会将 Broker 的 IP 地址、端口相关的数据,以及保存在 Broker 中的 RocketMQ 集群路由的数据一并跟随心跳发送到 NameServer。这里的路由信息是指 Topic 下的 MessageQueue 分别都在哪台 Broker 上。

从代码是了解这一过程

registerBrokerbroker注册、维护的主要逻辑,主要的几个集合:

  • topicQueueTable topic和broker对应关系
  • brokerAddrTable broker信息
  • clusterAddrTable 集群信息
代码语言:javascript复制
// 处理 broker 相关事务
public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //broker 超时时间设置 120 秒,就是这个指定的,没有发现有给api修改
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // topic 路由表
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // broker 信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

    //省略部分代码

    //注册 broker
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //集群名称
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    //如果没有拿到 broker名,broker就用 clusterName
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;

                //brokerData 数据,第一次注册,并没有数据
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                //key 是 0-n
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                //slave 切换到 master:删除1,再将slave改为0,add到brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    //brokerAddr 申请注册的 broker
                    //去重,找到 IP:PORT 只允许一条存在,如果 IP:PORT 存在,ID不同,删除这一条
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }
                //上面删完,这里add进去,可以理解成更新操作
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);//返回旧值

                //处理 topic 的配置修改,如果是master,开发中topic经常会调整
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
            // 省略部分代码
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

topic 队列和 broker 的对应关系

一个topic默认会有16个队列(queue),队列(queue)会分布在不同的broker

代码语言:javascript复制
//创建、更新队列 brokerName 和 queue 的对应关系
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    //默认16
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    //默认16
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataList) {
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    } else {
        boolean addNewOne = true;

        Iterator<QueueData> it = queueDataList.iterator();
        while (it.hasNext()) {
            QueueData qd = it.next();
            //绑定队列对应的 brokerName
            //比如 broker1  TopicA---queue1
            //                      queue2
            //                      queue3
            //                      queue4
            if (qd.getBrokerName().equals(brokerName)) {
                if (qd.equals(queueData)) {
                    addNewOne = false;
                } else {
                    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                        queueData);
                    it.remove();
                }
            }
        }

        if (addNewOne) {
            queueDataList.add(queueData);
        }
    }
}

总结

NameServer 其实就是抄的 kafka的注册中心,又搞的不像,两个节点之前状态不一致表面上说是开发简单,那实际用起来呢,再观望一下,不行我提个PR帮他们搞搞。

0 人点赞