NameServer 名字服务
实际作就是就一个注册中心
NameServer 作用
在系统中肯定是做命名服务,服务治理方面的工作,功能应该是和zookeeper差不多 早期的版本中,使用的是 Zookeeper 做为配置中心,改名 RocketMQ 后使用了自己开发的 NameServer。 是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
两个主要做用
- NameServer维护Broker NameServer 维护了一份 Broker 的地址列表和 Broker 在启动的时候会去 NameServer 进行注册,会维护 Broker 的存活状态。
- NameServer 维护Topic NameServer 维护了一份 Topic 和 Topic 对应队列的地址列表,Broker 每次发送的心跳过来的时候会把 Topic 信息带上。
producer、consumer 发送消息会去 NameServer 去拉取路由信息
NameServer 维护 Broker
1.维护 Broker 信息 broker 启动后,会连接到 NameServer,定期上报自身信息,NameServer 收到消息后会每 30秒 扫描一次所有已上报的 Broker 信息的心跳。 NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。 注意: 但是路由变化不会马上通知消息生产者Producer。 这样设计的目的是为了降低 NameServer 实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
2.判断 broker 失效 以 NameServer自身 broker列表中的broker的更新时间,当前时间与最后更新时间差值超过2分钟,就判定为失效,移除失效 broker。这个后面带上源码分析。
3.无状态性 NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是NameServer设计的一个亮点
特点:
- 互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用,这点类似于dubbo的zookeeper。
- nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
Broker 启动的时候会将自己注册到 NameServer 中,注册的同时还会将 Broker 的 IP 地址、端口相关的数据,以及保存在 Broker 中的 RocketMQ 集群路由的数据一并跟随心跳发送到 NameServer。这里的路由信息是指 Topic 下的 MessageQueue 分别都在哪台 Broker 上。
从代码是了解这一过程
registerBroker 是broker
注册、维护的主要逻辑,主要的几个集合:
topicQueueTable
topic和broker对应关系brokerAddrTable
broker信息clusterAddrTable
集群信息
// 处理 broker 相关事务
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//broker 超时时间设置 120 秒,就是这个指定的,没有发现有给api修改
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// topic 路由表
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker 信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//省略部分代码
//注册 broker
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
//集群名称
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
//如果没有拿到 broker名,broker就用 clusterName
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//brokerData 数据,第一次注册,并没有数据
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//key 是 0-n
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
//slave 切换到 master:删除1,再将slave改为0,add到brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
//brokerAddr 申请注册的 broker
//去重,找到 IP:PORT 只允许一条存在,如果 IP:PORT 存在,ID不同,删除这一条
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
//上面删完,这里add进去,可以理解成更新操作
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);//返回旧值
//处理 topic 的配置修改,如果是master,开发中topic经常会调整
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 省略部分代码
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
topic 队列和 broker 的对应关系
一个topic默认会有16个队列(queue)
,队列(queue)会分布在不同的broker
上
//创建、更新队列 brokerName 和 queue 的对应关系
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
//默认16
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
//默认16
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
//绑定队列对应的 brokerName
//比如 broker1 TopicA---queue1
// queue2
// queue3
// queue4
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
总结
NameServer 其实就是抄的 kafka的注册中心,又搞的不像,两个节点之前状态不一致表面上说是开发简单,那实际用起来呢,再观望一下,不行我提个PR帮他们搞搞。