[从源码学设计]蚂蚁金服SOFARegistry之存储结构
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第六篇,介绍SOFARegistry的存储结构,本文与业务联系密切。
0x01 业务范畴
首先,我们从 Data Server 角度出发,看看本身可能涉及的存储结构。
哪些需要存储。
- 本身服务器状态;
- 其他服务器节点状态,比如其他DataServer,SessionServer,MetaServer;
- 注册的服务状态;
所以我们得到如下问题需要思考:
- 问题:DataServer如何知道/保存其他 DataServer?
- 问题:考虑 其他DataServer 需要保存什么:ip,端口,状态,如何hash,里面存储的数据怎么对应hash?
- 问题:几个DataServer都从MetaServer获取数据变化,互相发送同步消息,怎么处理?
- 问题:与其他 Data Server 怎么合作,新加入一个DataServer,数据需要切换?
- 问题:为什么要把 DataServerNodeFactory 和 DataServerCache 分开。因为 Node 的结构不同导致的?
- 问题:DataServerCache内部成员分成几类模块?需要保存哪些信息?为什么这样保存?
可能有些问题脱离了本文研究范畴,但是我们也一起罗列在这里。
1.1 缓存
时间和空间之间的平衡关系可以说是计算机系统中最为本质的关系之一。
时间和空间这一对矛盾关系在推荐系统中的典型表现,主要体现在对缓存的使用上。
缓存通常用来存储一些计算代价较高以及相对静态变化较少的数据,常常在生产者和消费者之间起到缓冲的作用,使得二者可以解耦,各自异步进行。
利用缓存来解耦系统,带来性能上的提升以及开发的便利,是在系统架构设计中需要掌握的一种通用的思路。
1.2 DataServer 分片机制
在大部分的服务注册中心系统中,每台服务器都存储着全量的服务注册数据,服务器之间通过一致性协议(paxos、Raft 等)实现数据的复制,或者采用只保障最终一致性的算法,来实现异步数据复制。这样的设计对于一般业务规模的系统来说没有问题,而当应用于有着海量服务的庞大的业务系统来说,就会遇到性能瓶颈。
为解决这一问题,SOFARegistry 采用了数据分片的方法。全量服务注册数据不再保存在单机里,而是分布于每个节点中,每台服务器保存一定量的服务注册数据,同时进行多副本备份,从理论上实现了服务无限扩容,且实现了高可用,最终达到支撑海量数据的目的。
在各种数据分片算法中,SOFARegistry 采用了业界主流的一致性 Hash 算法做数据分片,当节点动态扩缩容时,数据仍能均匀分布,维持数据的平衡。
在数据同步时,没有采用与 Dynamo、Casandra、Tair、Codis、Redis cluster 等项目中类似的预分片机制,而是在 DataServer 内存里以 dataInfoId 为粒度进行操作日志记录,这种实现方式在某种程度上也实现了“预分片”,从而保障了数据同步的有效性。
1.3 服务模型
为了更好的说明数据类型,我们只能从SOFA博客中大段摘取文字。
1.3.1 服务发布模型(PublisherRegister)
- dataInfoId:服务唯一标识,由``<分组 group>
和
<租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 通常是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据做逻辑上的切分,使不同 group 和 instance 的服务数据在逻辑上完全独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义即可。 - zone:是一种单元化架构下的概念,代表一个机房内的逻辑单元,通常一个物理机房(Datacenter)包含多个逻辑单元(zone)。在服务发现场景下,发布服务时需指定逻辑单元(zone),而订阅服务者可以订阅逻辑单元(zone)维度的服务数据,也可以订阅物理机房(datacenter)维度的服务数据,即订阅该 datacenter 下的所有 zone 的服务数据。
- dataList:服务注册数据,通常包含“协议”、“地址”和“额外的配置参数”,例如 SOFARPC 所发布的数据类似
bolt://192.168.1.100:8080?timeout=2000
”。这里使用 dataList,表示一个 PublisherRegister 可以允许同时发布多个服务数据(但是通常只会发布一个)。
1.3.2 服务订阅模型(SubscriberRegister)
- dataInfoId:服务唯一标识,上面已经解释过了。
- scope: 订阅维度,共有 3 种订阅维度:zone、dataCenter 和 global。zone 和 datacenter 的意义,在上述有关“zone”的介绍里已经解释。global 维度涉及到机房间数据同步的特性,目前暂未开源。
关于“zone”和“scope”的概念理解,这里再举个例子。如下图所示,物理机房内有 ZoneA 和 ZoneB 两个单元,PublisherA 处于 ZoneA 里,所以发布服务时指定了 zone=ZoneA,PublisherB 处于 ZoneB 里,所以发布服务时指定了 zone=ZoneB;此时 Subscriber 订阅时指定了 scope=datacenter 级别,因此它可以获取到 PublisherA 和 PublisherB (如果 Subscriber 订阅时指定了 scope=zone 级别,那么它只能获取到 PublisherA)。
1.3.3 dataInfoId
Data 层是数据服务器集群。Data 层通过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的唯一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。
SOFARegistry 最早选择了一致性哈希分片,所以同样遇到了数据分布不固定带来的数据同步难题。我们如何解决的呢?我们通过在 DataServer 内存中以 dataInfoId 的粒度记录操作日志,并且在 DataServer 之间也是以 dataInfoId 的粒度去做数据同步(一个服务就由一个 dataInfoId 唯标识)。其实这种日志记录的思想和虚拟桶是一致的,只是每个 datainfoId 就相当于一个 slot 了,这是一种因历史原因而采取的妥协方案。在服务注册中心的场景下,datainfoId 往往对应着一个发布的服务,所以总量还是比较有限的,以蚂蚁金服目前的规模,每台 DataServer 中承载的 dataInfoId 数量也仅在数万的级别,勉强实现了 dataInfoId 作为 slot 的数据多副本同步方案。
最终一致性
SOFARegistry 在数据存储层面采用了类似 Eureka 的最终一致性的过程,但是存储内容上和 Eureka 在每个节点存储相同内容特性不同,采用每个节点上的内容按照一致性 Hash 数据分片来达到数据容量无限水平扩展能力。
SOFARegistry 是一个 AP 分布式系统,表明了在已有条件 P 的前提下,选择了 A 可用性。当数据进行同步时,获取到的数据与实际数据不一致。但因为存储的信息为服务的注册节点,尽管会有短暂的不一致产生,但对于客户端来说,大概率还是能从这部分数据中找到可用的节点,不会因为数据暂时的不一致对业务系统带来致命性的伤害。
集群内部数据迁移过程
SOFARegistry 的 DataServer 选择了“一致性 Hash分片”来存储数据。在“一致性 Hash分片”的基础上,为了避免“分片数据不固定”这个问题,SOFARegistry 选择了在 DataServer 内存里以 dataInfoId 的粒度记录操作日志,并且在 DataServer 之间也是以 dataInfoId 的粒度去做数据同步。
图 DataServer 之间进行异步数据同步
数据和副本分别分布在不同的节点上,进行一致性 Hash 分片,当时对主副本进行写操作之后,主副本会把数据异步地更新到其他副本中,实现了集群内部不同副本之间的数据迁移工作。
1.3.4 版本号
为了确定服务发布数据的变更,SOFA对于一个服务不仅定义了服务 ID,还对一个服务 ID 定义了对应的版本信息。
服务发布数据变更主动通知到 Session 时,Session 对服务 ID 版本变更比较,高版本覆盖低版本数据,然后进行推送。
因为有了服务 ID 的版本号,Session 可以定期发起版本号比较,如果Session 存储的的服务 ID 版本号高于dataServer存储的 ,Session再次拉取新版本数据进行推送,这样避免了某次变更通知没有通知到所有订阅方的情况。
0x02 基本概念
首先,我们讲讲一些基本概念。
2.1 物理机房DataCenter
DataCenter代表一个物理机房。一个数据中心包括多个DataNode,这些DataNode就是同机房数据节点。
代码语言:javascript复制nodeList.add(new DataNode(new URL("192.168.0.1", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.2", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.3", 9632), "DefaultDataCenter"));
2.2 Server节点DataNode
DataNode是Server节点,可以代表任意类型的Server,无论是meta,data,session。
代码语言:javascript复制public class DataNode implements Node, HashNode {
private final URL nodeUrl;
private final String nodeName;
private final String dataCenter;
private String regionId;
private NodeStatus nodeStatus;
private long registrationTimestamp;
}
2.3 数据节点DataServerNode
这是 Data Server 概念。
为什么要有DataNode和DataServerNode两个类似的数据结构类型。
原来这是分属于不同的包,或者模块。
- DataNode 是从 MetaServer 传来的,被 DataServerCache 使用,而 DataServerCache 放在 cache 包。
- DataServerNode 是 DataServer 本身自己依据信息构建的,被DataServerNodeFactory 使用,放在 remoting 包。
具体定义如下:
代码语言:javascript复制public class DataServerNode implements HashNode {
private String ip;
private String dataCenter;
private Connection connection;
}
2.4 服务Publisher
Publisher 是服务概念,具体如下。
代码语言:javascript复制public class Publisher extends BaseInfo {
private List<ServerDataBox> dataList;
private PublishType publishType = PublishType.NORMAL;
}
2.5 服务聚合Datum
Datum类定义如下,可以看到里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是代表发布者自己业务服务器。Datum则是从SOFARegistry整体角度做了整理,就是一个Session Server包括的服务聚合。
代码语言:javascript复制public class Datum implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataId;
private String instanceId;
private String group;
private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
private long version;
private boolean containsUnPub = false;
}
0x03 本机 Data Server
以下是关于本Data Server服务器的数据结构。
3.1 本身Data Server状态
DataNodeStatus代表本身Data Server的状态。
代码语言:javascript复制com.alipay.sofa.registry.server.data.node.DataNodeStatus
public enum LocalServerStatusEnum {
INITIAL,
WORKING
}
public class DataNodeStatus {
private volatile LocalServerStatusEnum status = LocalServerStatusEnum.INITIAL;
public LocalServerStatusEnum getStatus() {
return status;
}
public void setStatus(LocalServerStatusEnum status) {
LocalServerStatusEnum originStatus = this.status;
this.status = status;
}
}
3.1.1 设置状态
DataServerCache . updateDataServerStatus 中有设置DataNodeStatus的状态,比如:dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING)。
代码语言:javascript复制private void updateDataServerStatus() {
if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING);
//after working update current dataCenter list to old DataServerChangeItem
updateItem(
newDataServerChangeItem.getServerMap().get(dataServerConfig.getLocalDataCenter()),
newVersion, dataServerConfig.getLocalDataCenter());
}
}
另外 addNotWorkingServer 有 addStatus 操作。
代码语言:javascript复制public void addNotWorkingServer(long version, String ip) {
synchronized (DataServerCache.class) {
if (version >= curVersion.get()) {
addStatus(version, ip, LocalServerStatusEnum.INITIAL);
if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
updateDataServerStatus();
}
}
}
}
3.2 本DataServer配置
DataServerConfig包括了本DataServer全部配置,这里只摘录了相关信息。
代码语言:javascript复制public class DataServerConfig {
public static final String IP = NetUtil.getLocalAddress().getHostAddress();
private CommonConfig commonConfig;
private int numberOfReplicas = 1000;
public int getNumberOfReplicas() {
return numberOfReplicas;
}
public String getLocalDataCenter() {
return commonConfig.getLocalDataCenter();
}
}
3.3 Beans
对于 DataServerConfig 和 DataNodeStatus,系统做了beans。
代码语言:javascript复制@Configuration
protected static class DataServerBootstrapConfigConfiguration {
...
@Bean
@ConditionalOnMissingBean
public DataServerConfig dataServerConfig(CommonConfig commonConfig) {
return new DataServerConfig(commonConfig);
}
@Bean
public DataNodeStatus dataNodeStatus() {
return new DataNodeStatus();
}
...
}
具体如下:
代码语言:javascript复制 -----------------------------------------------
| |
| [Data Server] |
| |
| |
| --------------- ------------------ |
| | DataNodeStatus| | DataServerConfig | |
| --------------- ------------------ |
| |
-----------------------------------------------
0x04 Meta Server
因为不涉及到 Meta Server 内部架构,所以从 Data Server 角度看,只要存储 Meta Server 对应的网络 Connection 即可。
其中逻辑意义是:Map<dataCenter, Map<ip, Connection>>
,就是哪些dataCenter中包括哪些Meta Server,对应于哪些ip。
public class MetaServerConnectionFactory {
private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();
}
具体如下:
代码语言:javascript复制 -----------------------------------------
| MetaServerConnectionFactory |
| |
| Map<dataCenter, Map<ip, Connection> > |
| |
-----------------------------------------
0x05 Session Server
因为不涉及到 Session Server 内部架构,所以从 Data Server 角度看,只要存储 Session Server 对应的网络 Connection 即可。
其中逻辑含义从注释即可了解。
代码语言:javascript复制public class SessionServerConnectionFactory {
private static final int DELAY = 30 * 1000;
private static final Map EMPTY_MAP = new HashMap(0);
/**
* key : SessionServer address
* value: SessionServer processId
*/
private final Map<String, String> SESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>();
/**
* key : SessionServer processId
* value: ip:port of clients
*/
private final Map<String, Set<String>> PROCESS_ID_CONNECT_ID_MAP = new ConcurrentHashMap<>();
/**
* key : SessionServer processId
* value: pair(SessionServer address, SessionServer connection)
*/
private final Map<String, Pair> PROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>();
@Autowired
private DisconnectEventHandler disconnectEventHandler;
}
具体如下:
代码语言:javascript复制 ------------------------------------------------------------------------------------
|[SessionServerConnectionFactory] |
| |
| |
| |
|EMPTY_MAP |
| |
|Map<SessionServer address, SessionServer processId> |
| |
|Map<SessionServer processId, Set<ip:port of clients> > |
| |
|Map<SessionServer processId, pair(SessionServer address, SessionServer connection)> |
| |
------------------------------------------------------------------------------------
0x06 其他Data Server
因为涉及到与其他 Data Server 的深度交互,所以需要对其他 Data Server 的深度信息作存储。
分为两个部分:DataServerNodeFactory和DataServerCache。
为什么要有DataServerNodeFactory和DataServerCache两个类似的数据结构类型。
从注释来看,是为了把功能分离细化,DataServerNodeFactory专注连接管理,DataServerCache注重dataServer的变化与版本管理。
DataServerNodeFactory:
代码语言:javascript复制the factory to hold other dataservers and connection connected to them
DataServerCache
代码语言:javascript复制cache of dataservers
所以也分别在不同的包,或者模块。
- DataServerCache 放在 cache 包。
- DataServerNodeFactory 放在 remoting 包。
6.1 DataServerNodeFactory
DataServerNodeFactory 存储了其他 Data Server 的 DataServerNode,因为 DataServerNode 本身就包括了 Connection,所以 DataServerNodeFactory 也间接的包含了 Connection,这从其类定义注释可以看出,而且其定义是在remoting.dataserver包之中。
代码语言:javascript复制the factory to hold other dataservers and connection connected to them
DataServerNodeFactory 里面按照两个维度存储同一类东西,就是其他DataServer :
- Map<dataCenter, Map<ip, DataServerNode>> MAP;
- Map<dataCenter, ConsistentHash>这里会计算 DataServerNode 的一致性hash。
具体定义如下:
代码语言:javascript复制public class DataServerNodeFactory {
/**
* row: dataCenter
* column: ip
* value dataServerNode
*/
private static final Map<String, Map<String, DataServerNode>> MAP = new ConcurrentHashMap<>();
/**
* key: dataCenter
* value: consistentHash
*/
private static final Map<String, ConsistentHash<DataServerNode>> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>();
private static AtomicBoolean init = new AtomicBoolean(false);
}
6.1.1 添加
在DataServerChangeEventHandler.doHandle里面会调用connectDataServer,其又会调用 DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); 来添加,其里面又会生成一致性Hash。
6.1.2 使用
DefaultMetaServiceImpl 会调用 DataServerNodeFactory 来计算 consistentHash 来获取 DataServerNode。
代码语言:javascript复制public class DefaultMetaServiceImpl implements IMetaServerService {
@Override
public DataServerNode getDataServer(String dataCenter, String dataInfoId) {
return DataServerNodeFactory.computeDataServerNode(dataCenter, dataInfoId);
}
}
6.2 DataServerCache
由注释可以知道,这是其他dataservers的缓存。
代码语言:javascript复制cache of dataservers
几个关键变量:
nodeStatusMap 是本 Data Center 中所有 Data Server 的状态。
- dataServerChangeItem 是当前节点列表;
- newDataServerChangeItem 是新加入的节点列表;有两个变量的原因是因为有一个变化过程,所以加入一个New....;
具体如下:
代码语言:javascript复制com.alipay.sofa.registry.server.data.cache.DataServerCache
public class DataServerCache {
@Autowired
private DataNodeStatus dataNodeStatus;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private AfterWorkingProcessHandler afterWorkingProcessHandler;
/** current dataServer list and version */
private volatile DataServerChangeItem dataServerChangeItem = new DataServerChangeItem();
/** new input dataServer list and version */
private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem();
private final AtomicBoolean HAS_NOTIFY_ALL = new AtomicBoolean(false);
private AtomicLong curVersion = new AtomicLong(-1L);
/** version -> Map(serverIp, serverStatus) */
private Map<Long, Map<String, LocalServerStatusEnum>> nodeStatusMap = new ConcurrentHashMap<>();
}
下面介绍几个DataServerCache的函数。
6.2.1 updateItem
updateItem会被几个不同地方调用,进行更新dataServerChangeItem,就是插入一个new DataServer。
比如:LocalDataServerChangeEvent 和 DataServerChangeEvent 的响应函数就会调用。
代码语言:javascript复制public void updateItem(Map<String, DataNode> localDataNodes, Long version, String dataCenter) {
synchronized (DataServerCache.class) {
Long oldVersion = dataServerChangeItem.getVersionMap().get(dataCenter);
Map<String, DataNode> oldList = dataServerChangeItem.getServerMap().get(dataCenter);
Set<String> oldIps = oldList == null ? new HashSet<>() : oldList.keySet();
Set<String> newIps = localDataNodes == null ? new HashSet<>() : localDataNodes.keySet();
dataServerChangeItem.getServerMap().put(dataCenter, localDataNodes);
dataServerChangeItem.getVersionMap().put(dataCenter, version);
}
}
6.2.2 newDataServerChangeItem
newDataServerChangeItem 用这个来获取所有的datacenters的所有DataServer。
代码语言:javascript复制/**
* get all datacenters
*
* @return
*/
public Set<String> getAllDataCenters() {
return newDataServerChangeItem.getVersionMap().keySet();
}
6.2.3 dataServerChangeItem
dataServerChangeItem 被用来获取某一个特定 data center的所有DataServer。
代码语言:javascript复制public Map<String, DataNode> getDataServers(String dataCenter) {
return getDataServers(dataCenter, dataServerChangeItem);
}
public Map<String, DataNode> getDataServers(String dataCenter,
DataServerChangeItem dataServerChangeItem) {
return doGetDataServers(dataCenter, dataServerChangeItem);
}
private Map<String, DataNode> doGetDataServers(String dataCenter,
DataServerChangeItem dataServerChangeItem) {
synchronized (DataServerCache.class) {
Map<String, Map<String, DataNode>> dataserverMap = dataServerChangeItem.getServerMap();
if (dataserverMap.containsKey(dataCenter)) {
return dataserverMap.get(dataCenter);
} else {
return new HashMap<>();
}
}
}
6.2.4 DataServerChangeItem
此数据结构实际只用来获取local data center的data servers。
代码语言:javascript复制/**
* change info of datacenters
*/
public class DataServerChangeItem {
/** datacenter -> Map<ip, DataNode> */
private Map<String, Map<String, DataNode>> serverMap;
/** datacenter -> version */
private Map<String, Long> versionMap;
}
6.2.5 使用
有些类会间接使用dataServerCache。
比如:DefaultMetaServiceImpl . dataServerCache 会被NotifyOnlineHandler调用。
代码语言:javascript复制public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {
@Autowired
private DataServerCache dataServerCache;
@Override
public Object doHandle(Channel channel, NotifyOnlineRequest request) {
long version = request.getVersion();
if (version >= dataServerCache.getCurVersion()) {
dataServerCache.addNotWorkingServer(version, request.getIp());
}
return CommonResponse.buildSuccessResponse();
}
}
NotifyOnlineHandler其配置在
代码语言:javascript复制@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler()); //在这里
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler());
return list;
}
属于 dataSyncServer 响应函数的一部分。
代码语言:javascript复制private void openDataSyncServer() {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
具体如下:
代码语言:javascript复制 ---------------------------------------------------
| |
| [DataServerNodeFactory] |
| |
| |
| Map<dataCenter, Map<ip, dataServerNode> > |
| |
| Map<dataCenter, ConsistentHash<DataServerNode> > |
| |
---------------------------------------------------
---------------------------------------------------
| [DataServerCache] |
| |
| |
| DataServerChangeItem dataServerChangeItem |
| |
| DataServerChangeItem newDataServerChangeItem |
| |
| Map<>ersion, Map<serverIp, serverStatus> > |
| |
---------------------------------------------------
6.2.6 DataServer版本
从之前的MetaServer分析以及DataServerChangeItem,可知DataSever也是有版本的。
代码语言:javascript复制public class DataServerChangeItem {
/** datacenter -> Map<ip, DataNode> */
private Map<String, Map<String, DataNode>> serverMap;
/** datacenter -> version */
private Map<String, Long> versionMap;
}
在DataServer中,DefaultMetaServiceImpl中有设置版本号。
代码语言:javascript复制if (obj instanceof NodeChangeResult) {
NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
Map<String, Long> versionMap = result.getDataCenterListVersions();
versionMap.put(result.getLocalDataCenter(), result.getVersion());
return new DataServerChangeItem(result.getNodes(), versionMap);
}
其来源是MetaServer的DataStoreService。
代码语言:javascript复制dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
if (localDataCenter.equalsIgnoreCase(dataCenter)) {
nodeChangeResult.setVersion(dataNodeRepository.getVersion());
}
versionMap.put(dataCenter, dataNodeRepository.getVersion());
Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
Map<String, DataNode> newMap = new ConcurrentHashMap<>();
dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
pushNodes.put(dataCenter, newMap);
});
以及DataStoreService。
代码语言:javascript复制metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> {
if (localDataCenter.equalsIgnoreCase(dataCenter)) {
nodeChangeResult.setVersion(metaNodeRepository.getVersion());
}
versionMap.put(dataCenter, metaNodeRepository.getVersion());
Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap();
Map<String, MetaNode> newMap = new ConcurrentHashMap<>();
dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
pushNodes.put(dataCenter, newMap);
});
都是提取dataServer的版本号,发送出去。
0x07 服务信息
服务信息包括 Subscriber 和 Publisher,这些信息需要深度存储,本文仅以 Publisher 为例分析。
前面描述了,Publisher包括在Datum之中,所以我们下面的讲解以Datum为主。
7.1 Datum
Datum类定义如下,可以看到里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是代表发布者自己业务服务器。Datum则是从SOFARegistry整体角度做了整理,就是一个Session Server包括的服务聚合。
代码语言:javascript复制public class Datum implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataId;
private String instanceId;
private String group;
private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
private long version;
private boolean containsUnPub = false;
}
7.2 DatumCache
DatumCache缓存了所有Datum,就是本DataServer对应所有的SessionServer中所有的服务。
代码语言:javascript复制public class DatumCache {
@Autowired
private DatumStorage localDatumStorage;
}
7.3 LocalDatumStorage
LocalDatumStorage负责Datum具体的存储。
代码语言:javascript复制public class LocalDatumStorage implements DatumStorage {
/**
* row: dataCenter
* column: dataInfoId
* value: datum
*/
protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>();
/**
* all datum index
*
* row: ip:port
* column: registerId
* value: publisher
*/
protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
@Autowired
private DataServerConfig dataServerConfig;
}
7.4 Beans
相关的Bean如下:
代码语言:javascript复制@Configuration
public static class DataServerStorageConfiguration {
@Bean
@ConditionalOnMissingBean
public DatumCache datumCache() {
return new DatumCache();
}
@Bean
@ConditionalOnMissingBean
public LocalDatumStorage localDatumStorage() {
return new LocalDatumStorage();
}
}
0x08 Datum的来龙去脉
8.1 Session Server 内部
首先,我们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。
代码语言:javascript复制private Map<String, Datum> getDatumsCache() {
Map<String, Datum> map = new HashMap<>();
NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
Collection<String> dataCenters = nodeManager.getDataCenters();
if (dataCenters != null) {
Collection<Key> keys = dataCenters.stream().
map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
new DatumKey(fetchDataInfoId, dataCenter))).
collect(Collectors.toList());
Map<Key, Value> values = null;
values = sessionCacheService.getValues(keys);
if (values != null) {
values.forEach((key, value) -> {
if (value != null && value.getPayload() != null) {
map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
}
});
}
}
return map;
}
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
8.2 PublishDataHandler
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
代码语言:javascript复制public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
@Autowired
private ForwardService forwardService;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumLeaseManager datumLeaseManager;
@Autowired
private ThreadPoolExecutor publishProcessorExecutor;
@Override
public Object doHandle(Channel channel, PublishDataRequest request) {
Publisher publisher = Publisher.internPublisher(request.getPublisher());
if (forwardService.needForward()) {
CommonResponse response = new CommonResponse();
response.setSuccess(false);
response.setMessage("Request refused, Server status is not working");
return response;
}
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
if (publisher.getPublishType() != PublishType.TEMPORARY) {
String connectId = WordCache.getInstance().getWordCache(
publisher.getSourceAddress().getAddressString());
sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
connectId);
// record the renew timestamp
datumLeaseManager.renew(connectId);
}
return CommonResponse.buildSuccessResponse();
}
}
8.3 DataChangeEventCenter
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
代码语言:javascript复制public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if (publisher.getPublishType() != PublishType.TEMPORARY) {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB_TEMP, datum));
}
}
8.4 DataChangeEventQueue
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
8.5 DataChangeHandler
在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。
代码语言:javascript复制public void start() {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx ) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(new ChangeNotifier(changeData, name));
}
});
}
}
具体如下:
代码语言:javascript复制
Session Server | Data Server
|
|
|
|
-------------------------- PublishDataRequest --------------------
| DataChangeFetchCloudTask --------------- -----> | PublishDataHandler |
----------- -------------- | ------ -------------
^ | |
| getValues | | onChange(Publisher)
| | v
| | -------- --------------
--------- ---------- | | DataChangeEventCenter |
|sessionCacheService | | -------- --------------
-------------------- | |
| | Datum
| |
| v
| -------- -------------
| | DataChangeEventQueue |
| -------- -------------
| |
| |
| | ChangeData
| v
| ------- -----------
| | DataChangeHandler |
-------------------
0x09 总结
至此,本文总结基本存储结构如下:
9.1 基本概念
物理机房DataCenter
DataCenter代表一个物理机房。一个数据中心包括多个DataNode,这些DataNode就是同机房数据节点。
Server节点DataNode
DataNode是Server节点,可以代表任意类型的Server,无论是meta,data,session。
数据节点DataServerNode
这是 Data Server 概念。
服务Publisher
Publisher 是服务概念,
服务聚合Datum
Datum里面有一个ConcurrentHashMap,其中放入了Datum所包括的Publisher。
Publisher 只是代表发布者自己业务服务器。Datum则是从SOFARegistry整体角度做了整理,就是一个Session Server包括的服务聚合。
9.2 本身Data Server状态
DataNodeStatus代表本身Data Server的状态。DataServerConfig包括了本DataServer全部配置。
9.3 Meta Server
因为不涉及到 Meta Server 内部架构,所以从 Data Server 角度看,只要存储 Meta Server 对应的网络 Connection 即可。
9.4 Session Server
因为不涉及到 Session Server 内部架构,所以从 Data Server 角度看,只要存储 Session Server 对应的网络 Connection 即可。
9.5 其他Data Server
因为涉及到与其他 Data Server 的深度交互,所以需要对其他 Data Server 的深度信息作存储。
分为两个部分:DataServerNodeFactory 和 DataServerCache。
为什么要有DataServerNodeFactory和DataServerCache两个类似的数据结构类型。
从注释来看,是为了把功能分离细化,DataServerNodeFactory专注连接管理,DataServerCache注重dataServer的变化与版本管理。
0xFF 参考
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服开源通信框架 SOFABolt 协议框架解析
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
蚂蚁通信框架实践
sofa-bolt 远程调用
sofa-bolt学习
SOFABolt 设计总结 - 优雅简洁的设计之道
SofaBolt源码分析-服务启动到消息处理
SOFABolt 源码分析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFARegistry 介绍
SOFABolt 源码分析13 - Connection 事件处理机制的设计