信息收集
dubbo对服务运行的监控,是通过从provider和consumer方收集调用信息存盘后,再由监控中心对数据分析绘表的方式完成的。 具体实现是provider和consumer向监控中心推数据。 今天以服务消费方为例,通过源码分析下消费方向监控中心上报数据的过程。 配置监控中心的两种方式:
代码语言:javascript复制<!--1,表示从注册中心发现监控中心地址-->
<dubbo:monitor protocol="registry"></dubbo:monitor>
<!--2,直连监控中心服务器地址-->
<dubbo:monitor address="10.47.17.170"></dubbo:monitor>
<!--配置过滤器monitor,dubbo是通过过滤器实现调用信息上报的-->
<dubbo:reference id="demoService" interface="demo.dubbo.api.DemoService" timeout="6000" filter="monitor"/>
以上spring配置里的<dubbo:monitor>标签的解析,在ReferenceBean的afterPropertiesSet方法中,逻辑如下
代码语言:javascript复制 public void afterPropertiesSet() throws Exception {
//....其他代码略
if (getMonitor() == null
&& (getConsumer() == null || getConsumer().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
//解析MonitorConfig类,从容器中获取monitorConfig对象
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " monitorConfig " and " config);
}
monitorConfig = config;
}
}
//把解析后对象赋值给monitor属性,后面构造代理会用到
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
}
构造代理的逻辑在ReferenceConfig类的createProxy方法中,因为我们这里走注册中心发现监控中心,所以看下面一段逻辑:
代码语言:javascript复制 //构造注册中心url
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
//通过注册中心的url构造monitor Url(***跟踪下loadMonitor***)
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
//放置监控url到map key为“monitor”(***重点在这里***)
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " interfaceName " on the consumer " NetUtils.getLocalHost() " use dubbo version " Version.getVersion() ", please config <dubbo:registry address="..." /> to your spring config.");
}
跟到AbstractInterfaceConfig类的loadMonitor方法:
代码语言:javascript复制/***
* 构造监控中心URL
* @param registryURL
* @return
*/
protected URL loadMonitor(URL registryURL) {
if (monitor == null) {
//没有配置监控中心,从dubbo.monitor.address属性中获取
String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
//获取监控中心服务发现协议,比如通过注册中心
String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
if (monitorAddress != null && monitorAddress.length() > 0
|| monitorProtocol != null && monitorProtocol.length() > 0) {
//都没有配置,new一个对象
monitor = new MonitorConfig();
} else {
//没有注册中心
return null;
}
}
//把属性文件中的的值,填充到monitor对象里
appendProperties(monitor);
Map<String, String> map = new HashMap<String, String>();
//
//这里接口固定是MonitorService.class.getName(),就是固定通过这个接口提供服务上报服务
//这里的MonitorService服务是由监控中心实现并注册的到注册中心。
map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
//把monitor对象里的属性,放到map里去,key是对象属性名
appendParameters(map, monitor);
String address = monitor.getAddress();
String sysaddress = System.getProperty("dubbo.monitor.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
//设置监控protocal
if (ConfigUtils.isNotEmpty(address)) {
if (!map.containsKey(Constants.PROTOCOL_KEY)) {
if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
map.put(Constants.PROTOCOL_KEY, "logstat");
} else {//没有logstat spi扩展,就用dubbo协议
map.put(Constants.PROTOCOL_KEY, "dubbo");
}
}
//构造通过address和map,构造url
return UrlUtils.parseURL(address, map);
} else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
//如果monitor配置是通过注册中心发现,监控服务,设置protocol是dubbo, 添加参数 protocol=registry,refer=StringUtils.toQueryString(map)
return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
}
return null;
}
以上逻辑构造了monitorUrl并通过 monitor key放入url的参数中。 由于dubbo是通过过滤器上报监控数据的,(关于dubbo使用过滤器机制,还要从dubbo aop实现入手),下面分析下具体过滤器如何使用monitorUrl的,可以看到文章开始我们配置的过滤器是“monitor”
过滤器
所以这里看下Filter的monitor spi实现,MonitorFilter类,具体在invoke方法里:
代码语言:javascript复制 //调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // 记录起始时间戮
getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
try {
Result result = invoker.invoke(invocation); // 让调用链往下执行
//上报调用统计信息(***看这里**)
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
}
} else {
return invoker.invoke(invocation);
}
}
//具体
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
// ---- 服务信息获取 ----
long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
String service = invoker.getInterface().getName(); // 获取服务名称
String method = RpcUtils.getMethodName(invocation); // 获取方法名
URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
//通过 monitor key 获取监控url (***看这里**),这里monitorFactory是spi机制生成的MonitorFactory$Adaptive
//这里实际是走的DubboMonitorFactroy类的getMonitor方法
Monitor monitor = monitorFactory.getMonitor(url);
int localPort;
String remoteKey;
String remoteValue;
if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
// ---- 服务消费方监控 ----
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- 服务提供方监控 ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
input = invocation.getAttachment(Constants.INPUT_KEY);
}
if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
output = result.getAttachment(Constants.OUTPUT_KEY);
}
//通过上面构造的监控上报工具,上报数据(***看这里**)
monitor.collect(new URL(Constants.COUNT_PROTOCOL,
NetUtils.getLocalHost(), localPort,
service "/" method,
MonitorService.APPLICATION, application,
MonitorService.INTERFACE, service,
MonitorService.METHOD, method,
remoteKey, remoteValue,
error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
Constants.INPUT_KEY, input,
Constants.OUTPUT_KEY, output));
} catch (Throwable t) {
logger.error("Failed to monitor count service " invoker.getUrl() ", cause: " t.getMessage(), t);
}
}
信息上报
看下DubboMonitorFactroy类的getMonitor方法,实现在其父类AbstractMonitorFactory中:
代码语言:javascript复制public Monitor getMonitor(URL url) {
//这里设置上报服务接口MonitorService
url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName());
String key = url.toServiceStringWithoutResolving();
LOCK.lock();
try {
//从缓存中获取
Monitor monitor = MONITORS.get(key);
if (monitor != null) {
return monitor;
}
//通过url创建monitor,在子类DubboMonitorFactroy中实现
monitor = createMonitor(url);
if (monitor == null) {
throw new IllegalStateException("Can not create monitor " url);
}
MONITORS.put(key, monitor);
return monitor;
} finally {
// 释放锁
LOCK.unlock();
}
}
DubboMonitorFactroy里实现的createMonitor方法:
代码语言:javascript复制protected Monitor createMonitor(URL url) {
//这里会通过url的protocol参数获取协议值,如果是通过注册中心发现监控中心服务的方式,这里
//protocol的值是registry,否则就是dubbo
url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(MonitorService.class.getName());
}
String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);
if (filter == null || filter.length() == 0) {
filter = "";
} else {
filter = filter ",";
}
//监控中心服务配置多个的场景,这里默认使用failsafe容错机制
url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),
Constants.REFERENCE_FILTER_KEY, filter "-monitor");
//这里protocol也是Protocol$Adpative的,如果协议是registry 要走通过注册中心发现服务那一套逻辑。
Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url);
//创建服务代理代理
MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
//最后构造BubboMonitor对象
return new DubboMonitor(monitorInvoker, monitorService);
}
这里看下DubboMonitor类继承图,可以看到它实现了MonitorService接口
代码语言:javascript复制//构造函数
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
this.monitorInvoker = monitorInvoker;
this.monitorService = monitorService;
this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
// 启动统计信息收集定时器,设置上报频率monitorInterval,所以说,上报数据是异步的
sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
// send方法收集统计信息
try {
(***看这里***)
send();
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at send statistic, cause: " t.getMessage(), t);
}
}
}, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
}
//从本地静态变量中获取统计信息,通过远程服务monitorService接口方法上报。
public void send() {
if (logger.isInfoEnabled()) {
logger.info("Send statistics to monitor " getUrl());
}
String timestamp = String.valueOf(System.currentTimeMillis());
for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
// 获取已统计数据
Statistics statistics = entry.getKey();
AtomicReference<long[]> reference = entry.getValue();
long[] numbers = reference.get();
long success = numbers[0];
long failure = numbers[1];
long input = numbers[2];
long output = numbers[3];
long elapsed = numbers[4];
long concurrent = numbers[5];
long maxInput = numbers[6];
long maxOutput = numbers[7];
long maxElapsed = numbers[8];
long maxConcurrent = numbers[9];
// 发送汇总信息
URL url = statistics.getUrl()
.addParameters(MonitorService.TIMESTAMP, timestamp,
MonitorService.SUCCESS, String.valueOf(success),
MonitorService.FAILURE, String.valueOf(failure),
MonitorService.INPUT, String.valueOf(input),
MonitorService.OUTPUT, String.valueOf(output),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
MonitorService.MAX_INPUT, String.valueOf(maxInput),
MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)
);
//调用监控中心发布的MonitorService服务,上报调用统计信息
monitorService.collect(url);
// 减掉已统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = 0;
update[1] = 0;
update[2] = 0;
update[3] = 0;
update[4] = 0;
update[5] = 0;
} else {
update[0] = current[0] - success;
update[1] = current[1] - failure;
update[2] = current[2] - input;
update[3] = current[3] - output;
update[4] = current[4] - elapsed;
update[5] = current[5] - concurrent;
}
} while (!reference.compareAndSet(current, update));
}
}
//而DubboMonitor本身的collect方法,供信息上报处,过滤器中调用
//每次的调用信息,放入本地静态变量statisticsMap中,
public void collect(URL url) {
// 读写统计变量
int success = url.getParameter(MonitorService.SUCCESS, 0);
int failure = url.getParameter(MonitorService.FAILURE, 0);
int input = url.getParameter(MonitorService.INPUT, 0);
int output = url.getParameter(MonitorService.OUTPUT, 0);
int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
// 初始化原子引用
Statistics statistics = new Statistics(url);
AtomicReference<long[]> reference = statisticsMap.get(statistics);
if (reference == null) {
statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
reference = statisticsMap.get(statistics);
}
// CompareAndSet并发加入统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = success;
update[1] = failure;
update[2] = input;
update[3] = output;
update[4] = elapsed;
update[5] = concurrent;
update[6] = input;
update[7] = output;
update[8] = elapsed;
update[9] = concurrent;
} else {
update[0] = current[0] success;
update[1] = current[1] failure;
update[2] = current[2] input;
update[3] = current[3] output;
update[4] = current[4] elapsed;
update[5] = (current[5] concurrent) / 2;
update[6] = current[6] > input ? current[6] : input;
update[7] = current[7] > output ? current[7] : output;
update[8] = current[8] > elapsed ? current[8] : elapsed;
update[9] = current[9] > concurrent ? current[9] : concurrent;
}
} while (!reference.compareAndSet(current, update));
}
以上梳理了下,服务消费方配置监控中心并上报调用数据的流程, 服务提供方上报监控中心的流程是一样的。同样使用这个过滤器完成。 下次再梳理下,监控中心本身的处理逻辑。