文章目录- nacos 配置中心的实现
- 客户端如何获得远程服务的数据
- 服务端长轮询处理机制
- 当我们通过控制台或api的方式修改了配置之后,如何实时通知的呢?
- nacos的集群
- 使用raft算法实现leader选举
- nacos集群是怎么实现的
- 客户端如何获得远程服务的数据
- 服务端长轮询处理机制
- 当我们通过控制台或api的方式修改了配置之后,如何实时通知的呢?
- 使用raft算法实现leader选举
- nacos集群是怎么实现的
nacos 配置中心的实现
客户端如何获得远程服务的数据
客户端的长轮询定时任务是在下面这行代码的时候启动的
代码语言:javascript复制ConfigService configService = NacosFactory.createConfigService(properties);
通过Class.forName来加载NacosConfigService类。
使用反射来完成NacosConfigService类的实例化。
代码语言:javascript复制public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
NacosConfigService构造方法如下:
- 初始化一个HttpAgent 使用了装饰器模式 起到ServerHttpAgent MetricsHttpAgent内部也调用了ServerHttpAgent的方法,增加了监控统计的信息。
- ClientWorker是客户端的一个工作类,agent作为参数传入ClientWorker 里面会用agent做一些与远程相关的事情。
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty("encode");
if (StringUtils.isBlank(encodeTmp)) {
this.encode = "UTF-8";
} else {
this.encode = encodeTmp.trim();
}
this.initNamespace(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
ClientWorker构造方法如下,主要功能是构造两个定时调度的线程池,并启动一个定时任务。
- 第一个线程池executor只拥有一个核心线程,每隔10ms就会执行一次checkConfigInfo()方法,从方法名上可以知道每10ms检查一次配置信息。
- 第二个线程池executorService只完成了初始化,后续会用到,主要用于实现客户端的定时长轮询功能。
public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
this.init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
ClientWorker.this.checkConfigInfo();
} catch (Throwable var2) {
ClientWorker.LOGGER.error("[" agent.getName() "] [sub-check] rotate check error", var2);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
在ClientWorker构造方法中,通过executor.scheduleWithFixedDelay启动了一个每隔10s执行一次的定时任务,其中调用的方法是checkConfigInfo。这个方法主要用来检查配置是否发生了变化,用到了executorService这个定时调度的线程池。
代码语言:javascript复制public void checkConfigInfo() {
int listenerSize = ((Map)this.cacheMap.get()).size();
int longingTaskCount = (int)Math.ceil((double)listenerSize / ParamUtil.getPerTaskConfigSize());
if ((double)longingTaskCount > this.currentLongingTaskCount) {
for(int i = (int)this.currentLongingTaskCount; i < longingTaskCount; i) {
this.executorService.execute(new ClientWorker.LongPollingRunnable(i));
}
this.currentLongingTaskCount = (double)longingTaskCount;
}
}
代码逻辑比较有意思,简单解释一下。
cacheMap:AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference(new HashMap());用来存储监听变更的缓存集合。key是根据dataID/group/tenant(租户)拼接的值。Value是对应的存储在Nacos服务器上的配置文件的内容。
长轮询任务拆分:默认情况下,每个长轮询 LongPollingRunnable任务处理3000个监听配置集。如果超过3000个,则需要启动多个LongPollingRunnable去执行。
LongPollingRunnable实际是一个线程,所以我们可以直接找到LongPollingRunnable里面的run方法
- 通过checkLocalConfig方法检查本地配置
- 执行checkUpdateDataIds方法和在服务端建立长轮询机制,从服务端获取发生变更的数据。
- 遍历变更数据集合changedGroupKeys,调用getServerConfig方法,根据DataId,Group,Tenant去服务端读取对应的配置信息并保存到本地文件中。
public void run() {
List<CacheData> cacheDatas = new ArrayList();
ArrayList inInitializingCacheList = new ArrayList();
try {
Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
//遍历CacheData 检查本地配置
while(var3.hasNext()) {
CacheData cacheData = (CacheData)var3.next();
if (cacheData.getTaskId() == this.taskId) {
cacheDatas.add(cacheData);
try {
ClientWorker.this.checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception var13) {
ClientWorker.LOGGER.error("get local config info error", var13);
}
}
}
//通过长轮询请求检查服务端对应的配置是否发生了变更
List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
ClientWorker.LOGGER.info("get changedGroupKeys:" changedGroupKeys);
Iterator var16 = changedGroupKeys.iterator();
while(var16.hasNext()) {
String groupKey = (String)var16.next();
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]});
} catch (NacosException var12) {
String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
ClientWorker.LOGGER.error(message, var12);
}
}
var16 = cacheDatas.iterator();
//触发事件通知
while(true) {
CacheData cacheDatax;
do {
if (!var16.hasNext()) {
inInitializingCacheList.clear();
//继续定时执行当前线程
ClientWorker.this.executorService.execute(this);
return;
}
cacheDatax = (CacheData)var16.next();
} while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
cacheDatax.checkListenerMd5();
cacheDatax.setInitializing(false);
}
} catch (Throwable var14) {
ClientWorker.LOGGER.error("longPolling error : ", var14);
ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
上面代码无非就是根据taskId对cacheMap进行数据分割,再比较本地配置文件的数据是否存在变更,如果有变更则直接触发通知。这里要注意的是,在$(user)nacosconfig目录下会缓存一份服务端的配置信息,checkLocalConfig会和本地磁盘中的文件内容进行比较。如果内存中的数据和磁盘中的数据不一致说明数据发生了变化,需要触发事件通知。
接着调用checkUpdateDataIds方法,基于长连接方式来监听服务端配置的变化,最后根据变化数据的key去服务端获取最新数据。checkUpdateDataIds最终会调用checkUpdateConfigStr方法。
代码语言:javascript复制List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = new ArrayList<String>(2);
params.add(Constants.PROBE_MODIFY_REQUEST);
params.add(probeUpdateString);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout (long) Math.round(timeout >> 1);
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH "/listener", headers, params,
agent.getEncode(), readTimeoutMs);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" agent.getName() "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
checkUpdateConfigStr方法实际上通过agent.httpPost调用/listerner接口实现长轮询请求。长轮询请求在实现层面只是设置了一个比较长的超时时间,默认是30秒。如果服务端的数据发生了变更,客户端会收到一个HttpResult,服务端返回的是存在数据变更的DataId,Group,Tenant。获得这些信息之后。在LongPollingRunable的run方法中调用getServerConfig去nacos服务器上读取具体的配置内容。
代码语言:javascript复制public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
String[] ct = new String[2];
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group));
} else {
params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
}
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
ct[0] = result.content;
if (result.headers.containsKey(CONFIG_TYPE)) {
ct[1] = result.headers.get(CONFIG_TYPE).get(0);
} else {
ct[1] = ConfigType.TEXT.getType();
}
return ct;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return ct;
case HttpURLConnection.HTTP_CONFLICT: {
LOGGER.error(
"[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
"tenant={}", agent.getName(), dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" dataId ",group=" group ",tenant=" tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" result.code ",dataId=" dataId ",group=" group ",tenant=" tenant);
}
}
}
服务端长轮询处理机制
找到nacos源码中的nacos-config模块,在controller中专门提供了ConfigController类来实现配置的基本操作,其中有一个/listener接口,它是客户端发起数据监听的接口。
- 获取客户端需要监听的可能发生变化的配置,并计算MD5值。
- inner.doPollingConfig开始执行长轮询请求。
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
}
catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig是一个长轮询的处理接口
代码语言:javascript复制/**
* 轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {
// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK "";
}
// else 兼容短轮询逻辑
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 兼容短轮询result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
/**
* 2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" newResult);
// 禁用缓存
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK "";
}
上述代码,首先会判断当前请求是否为长轮询,如果是,则调用addLongPollingClient
- 获取客户端请求的超时时间,减去500ms后赋值给timeout变量。
- 判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则,在29.5s后开始执行
- 和服务端的数据进行MD5对比,如果发生过变化则直接返回。
- scheduler.execute执行ClientLongPolling线程。
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
从addLongPollingClient方法中可以看到,它的主要作用是把客户端的长轮询请求封装成ClientPolling交给scheduler执行。
ClientLongPolling也是一个线程,run方法代码如下。
- 通过scheduler.schedule启动一个定时任务,并且延时时间为29.5s;
- 将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系
- 定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。
- 通过MD5比较客户端请求的groupKeys是否发生了变更,并将变更的结果通过response返回客户端。
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
从上述代码中 所谓的长轮询就是服务端收到请求之后,不立即放回,而是在延后29.5s才把请求结果返回给客户端,这就使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态。
当我们通过控制台或api的方式修改了配置之后,如何实时通知的呢?
LongPollingService继承了AbstractEventListener AbstractEventListener是一个事件抽象类,它有一个onEvent抽象方法,而LongPollingService实现了这个方法。
代码语言:javascript复制@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
从LongPollingService的onEvent方法中可以看到一个LocalDataChangeEvent事件。不难猜出,这个事件应该是在服务器的配置数据发生变化时发布的一个事件,看一下收到这个事件后的处理行为。
当LongPollingService的onEvent监听到事件后,通过线程池来执行一个DataChangeTask任务
- 遍历allSubs中的客户端长轮询请求。
- 比较每一个客户端长轮询请求携带的groupKey,如果服务端变更的配置和客户端请求关注的配置一致,则直接返回。
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}
// 如果tag发布且不在tag列表直接跳过
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
"polling",
clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" t.getMessage(), t.getCause());
}
}
nacos的集群
使用raft算法实现leader选举
nacos集群是怎么实现的
如果我们要实现一个配置中心 需要满足哪些条件
- 服务器端的配置保存(持久化)
数据库
- 服务器端提供访问api rpc http (openapi)
- 数据变化之后如何通知客户端 zookeeper (session manger) push(服务端主动推送到客户端),pull(客户端主动拉去数据)
- 客户端如何获得远程服务的数据
- 安全性
- 刷盘(本地缓存)