作为一个程序员,郁闷的事情是,面对一个代码块,却不敢去修改。更糟糕的是,这个代码块还是自己写的。
代码下载地址:https://github.com/f641385712/netflix-learning
前言
因为Ribbon
做负载均衡需要统计各维度的Server指标数据,使用的是自家的netflix-statistics
模块完成的,该模块旨在简化指标数据收集、计算的逻辑,小巧精悍,因此本文就用不长的文字学习下它,并且最后附上:基于netflix-statistics
手把手写个超简版监控系统。
正文
statistics
中文释义:statistic
的复数形式。统计学、统计数据的意思,常简称为stat
。
<dependency>
<groupId>com.netflix.netflix-commons</groupId>
<artifactId>netflix-statistics</artifactId>
<version>0.1.1</version>
</dependency>
说明:本文使用0.1.1版本是为了和Ribbon的2.3.0版本想匹配
该库属于netflix-commons
公共工程的一部分,负责数据收集和统计。短小精悍,基本诠释了什么叫指标收集、计算、分位数等监控核心概念。一共也就几个类而已,如下截图:
DataCollector
数据收集,以增量方式收集新值。
代码语言:javascript复制public interface DataCollector {
// 向收集的数据添加一个值
void noteValue(double val);
}
该接口非常简单,仅一个增量收集数据的方法。它是数据的唯一来源,作为“原料”,其它一切都围绕该接口进行展开,它的继承图谱如下:
Distribution
分布式系统的累加器,以增量方式产生的观测值。
代码语言:javascript复制// 实现了接口DistributionMBean,提供一些指标方法方法,如:
// getNumValues():获得收集到的值的个数
// getMean():获得平均值
// getVariance():得到方差
// getStdDev():标准差
// getMinimum():最小值
// getMaximum():最大值
public class Distribution implements DistributionMBean, DataCollector {
// 每noteValue()新增记录一个数据,它就 1
private long numValues;
// 所有值的总和
private double sumValues;
// sumValues的平方
private double sumSquareValues;
// 最大值,最小值
private double minValue;
private double maxValue;
... // 省略构造器
@Override
public void noteValue(double val) {
// 每次都加1
numValues ;
// 总和
sumValues = val;
sumSquareValues = val * val;
...
}
// 它是DistributionMBean的接口方法
@Override
public void clear() { 所有值归为默认值 }
@Override
public long getNumValues() {
return numValues;
}
...
}
使用多个成员变量数值维护收集到的数据,需要注意的是:本类是线程不安全的,若有多个线程请求同一个实例,那么值有可能不准确(但打点统计数据可以不要求那么精确)。
该类能够记录最终值:比如总次数、总和、平均值、最大最小值等等,但是它没法记录过程值,比如这段时间内的最大最小值、平均值等等,在监控体系中这都是有意义的数据,子类DataBuffer
将提供此种能力。
DataBuffer
在父类Distribution
的基础上,增加了固定大小的数据收集缓冲区double[] buf
,用于保存滑动窗口最近增加的值。因为可以缓冲数据了,所以有startCollection
和endCollection
动作,代表着一个收集周期。
public class DataBuffer extends Distribution {
...
// 这就是这个缓冲区,它里面的值代表着一个窗口期内的数据记录
private final double[] buf;
private long startMillis;
private long endMillis;
...
// 一个样本时间窗口。比如5000ms
public long getSampleIntervalMillis() {
return (endMillis - startMillis);
}
// 样本大小(说明:并不一定是buf的长度哦)
public int getSampleSize() {
return size;
}
@Override
public void clear() {
super.clear();
startMillis = 0;
size = 0;
insertPos = 0;
}
// **一切归零**,开始收集
public void startCollection() {
clear();
startMillis = System.currentTimeMillis();
}
// 对0到Size的数据排序
public void endCollection() {
endMillis = System.currentTimeMillis();
Arrays.sort(buf, 0, size);
}
// 收集数据:向buf缓冲区里也记录一份
@Override
public void noteValue(double val) {
super.noteValue(val);
buf[insertPos ] = val;
// 如果缓冲区满了,就会覆盖最老的缓冲区输入新值。
// 注意这个算法:覆盖最老的缓冲区的值,并不是简单粗暴的从头开始覆盖
if (insertPos >= buf.length) {
insertPos = 0;
size = buf.length;
} else if (insertPos > size) {
size = insertPos;
}
}
}
它扩展了父类的功能:提供数据缓冲区,从而缓冲到一段时间内的每个数据,从而基于这段时间内的数据便可计算出其最大最小值、平均值、分位数等等,并且它提供了便捷的计算分位数的方法:
代码语言:javascript复制DataBuffer:
// 计算,并获取请求百分比的统计信息
public double[] getPercentiles(double[] percents, double[] percentiles) {
for (int i = 0; i < percents.length; i ) {
// 计算百分比统计信息。比如若percents[i] = 50的话
// 就是计算buf缓冲区里中位数的值
// 90的话:计算90分位数的值(也就是该值比90%的数值都大)
// computePercentile是私有方法:根据当前窗口内收集到的数据进行计算分位数
percentiles[i] = computePercentile(percents[i]);
}
return percentiles;
}
通过这个方法,若我们传入percents
为:[50, 80, 90, 95, 99]的话,使用该方法就可得到对应分位数的值。
Histogram
它的作用是把数据分桶bucket
,然后提供找出median value
中位数的能力。功能和DataBuffer
类似,但统计得更详尽,使用得较少,略~
histogram
是直方图的意思,其中histogram.jar
这个库非常出名,hystrix内部就有使用到,此处取了同名,含义也是类似的
DataAccumulator数据累加器
Accumulator
:累加器。该类为抽象类,它是一个双缓冲(内部使用两个DataBuffer
来维护)的数据累加器:
- 一个是“current”缓冲区,并向其添加新数据。
- 另一个是“previous”缓冲区,存储“上个周期”内缓存区收集到的数据。
public abstract class DataAccumulator implements DataCollector {
private DataBuffer current;
private DataBuffer previous;
// 唯一构造器:必须要指定缓冲区的大小
// 因为缓冲区的大小决定了它最大能承载多大的量。比如缓冲区是10
// 这期间及时你100个请求打进来也只会给你统计最近的10个
public DataAccumulator(int bufferSize) {
this.current = new DataBuffer(bufferSize);
this.previous = new DataBuffer(bufferSize);
}
// 数据记录在current里。但是注意:加锁了,swapLock交换锁
// 也就是说在swap的时候不让新增数据,新增数据的时候不让swap
@Override
public void noteValue(double val) {
synchronized (swapLock) {
Lock l = current.getLock();
l.lock();
try {
current.noteValue(val);
} finally {
l.unlock();
}
}
}
}
它使用两个缓冲区来达到数据发布/交换的目的,从而可以持续的统计了,下面是它提供的发布/数据交换方法:
代码语言:javascript复制DataAccumulator:
//交换数据收集缓冲区,并计算统计数据关于目前收集的数据。
public void publish() {
...
synchronized (swapLock) {
// 开启收集
current.startCollection();
}
...
// 把previous老的结束收集,并且把数据发布出去参与计算
tmp.endCollection();
publish(tmp);
}
// 数据如何发布/如何计算交给子类实现
protected abstract void publish(DataBuffer buf);
publish
方法完成了数据的swap交换,会把previous
收集到的数据拿去计算(子类实现计算逻辑)。
DataDistribution
数据发布,它实现了具体的publish的能力。
代码语言:javascript复制// DataDistributionMBean提供如下方法:
// String getTimestamp():数据发布时间,也就是publish方法调用时间戳
// long getTimestampMillis():同上,只不过这是long的表达方式
// long getSampleIntervalMillis():一个样本的时间。如5s
// int getSampleSize():样本数据的大小(共多少个数据)
// double[] getPercents():获取百分比的数组[50,90,99.99]
// double[] getPercentiles():百分比数组对应的值(分位数)
public class DataDistribution extends DataAccumulator implements DataDistributionMBean {
// 数据总个数...
private long numValues = 0L;
// 平均数
private double mean = 0.0;
...
private int size = 0;
private final double[] percents;
private final double[] percentiles;
// 唯一构造器:必须指定缓冲区大小,以及percent百分比
// 百分比必须是0.0 - 100.0之间哦
public DataDistribution(int bufferSize, double[] percents) { ... }
// 实现父类的抽象方法:发布数据/计算数据
@Override
protected void publish(DataBuffer buf) {
ts = System.currentTimeMillis();
numValues = buf.getNumValues();
mean = buf.getMean();
...
// 计算出百分比对应的值,存储在percentiles里
buf.getPercentiles(percents, percentiles);
}
@Override
public void clear() { ... }
...
public double[] getPercents() {
return percents;
}
// 计算的结果
public double[] getPercentiles() {
return percentiles;
}
}
publish
一次产生一批新值,这个时候你若把它持久化下来以后就能参考喽。然后进入到下一轮的数据收集,所以说publish的调用节奏决定了它的数据收集的时间窗口。
DataPublisher
为了便于我们做周期性的数据统计,该库非常暖心的提供了一个现成的类帮我们来做。它是一个周期性更新DataAccumulator
的对象,在缓冲区之间进行交换(其实就是publish动作)。
public class DataPublisher {
// 静态常量
private static final String THREAD_NAME = "DataPublisher";
private static final boolean DAEMON_THREADS = true;
// ==============任务调度线程池==============
private static ScheduledExecutorService sharedExecutor = null;
// 数据累加器
private final DataAccumulator accumulator;
// 多久执行一次,单位ms。无默认值,必须指定项
private final long delayMillis;
// 任务调度结果,无实际意义。仅用于判断任务是否正在执行而已
private Future<?> future = null;
// 唯一构造器:必须指定周期时长
public DataPublisher(DataAccumulator accumulator, long delayMillis) {
this.accumulator = accumulator;
this.delayMillis = delayMillis;
}
}
你应该能猜到它会做什么了:使用ScheduledExecutorService
周期性的调用DataAccumulator#publish()
方法:
DataPublisher:
// 这几个任务调度方法都是synchronized同步的
public synchronized boolean isRunning() {
return (future != null);
}
// 开启任务:周期性的调用publish()方法
public synchronized void start() {
if (future == null) {
future = getExecutor().scheduleWithFixedDelay(() -> {
accumulator.publish();
}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);
}
}
public synchronized void stop() {
if (future != null) {
future.cancel(false);
future = null;
}
}
// 默认只给一个线程 默认只给一个线程 默认只给一个线程
protected synchronized ScheduledExecutorService getExecutor() {
if (sharedExecutor == null) {
sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory());
}
return sharedExecutor;
}
protected void handleException(Exception e) {
// Do nothing, for now
}
该类明显偏应用层,可以简单理解它就是在DataAccumulator
的基础上包了一层定时任务进行调度,从而对使用者更加友好,下面我们的示例也是基于它来完成的。
代码示例
本示例就是手把手写一个简版的监控系统,它虽然捡漏,但涵盖了很多监控层面的核心概念,该是不错的。
代码语言:javascript复制// 新线程:监控(模拟页面监控)
private void monitor(DataDistribution accumulator) {
new Thread(() -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(() -> {
System.out.println("=======时间:" accumulator.getTimestamp() ",统计值如下=======");
System.out.println("统计周期:" (accumulator.getSampleIntervalMillis() / 1000) "s");
System.out.println("样本数据个数:" accumulator.getSampleSize());
// 周期:startCollection到endCollection的时间差
System.out.println("最大值:" accumulator.getMaximum());
System.out.println("最小值:" accumulator.getMinimum());
System.out.println("算术平均值:" accumulator.getMean());
System.out.println("各分位数对应值:" Arrays.toString(accumulator.getPercentiles()));
}, 8, 8, TimeUnit.SECONDS);
}).start();
}
// 发布数据 5s发布一次数据
private void publishData(DataDistribution accumulator) {
new Thread(() -> {
new DataPublisher(accumulator, 5 * 1000).start();
}).start();
}
// 新开一个线程生产数据
private void produceValue(DataDistribution accumulator) {
new Thread(() -> {
while (true) {
accumulator.noteValue(randomValue(10, 2000));
try {
TimeUnit.MILLISECONDS.sleep(randomValue(10, 200));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 本地使用随机数模拟数据收集
private int randomValue(int min, int max) {
return min (int) (Math.random() * ((max - min) 1));
}
书写测试代码:
代码语言:javascript复制@Test
public void fun1() throws InterruptedException {
int bufferSize = 50; //最大样本容量,注意是最大
double[] percents = {50, 80, 90, 95, 99};
DataDistribution accumulator = new DataDistribution(bufferSize, percents);
// 生产数据
produceValue(accumulator);
// 发布数据
publishData(accumulator);
// 监控(模拟监控页面:数据打印到控制台)
monitor(accumulator);
// hold主线程
TimeUnit.SECONDS.sleep(10000);
}
运行程序,控制台输出:
代码语言:javascript复制=======时间:Mon Mar 16 11:29:47 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:50
最大值:1990.0
最小值:139.0
算术平均值:1022.42
各分位数对应值:[969.0, 1624.0, 1853.0, 1975.0, 1990.0]
=======时间:Mon Mar 16 11:29:57 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:47
最大值:1961.0
最小值:13.0
算术平均值:1043.5531914893618
各分位数对应值:[1061.0, 1746.8000000000002, 1862.6000000000001, 1938.75, 1961.0]
=======时间:Mon Mar 16 11:30:02 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:45
最大值:1983.0
最小值:16.0
算术平均值:954.8888888888889
各分位数对应值:[929.0, 1764.0, 1865.5, 1941.5, 1983.0]
=======时间:Mon Mar 16 11:30:12 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:50
最大值:1977.0
最小值:101.0
算术平均值:991.26
各分位数对应值:[1103.0, 1600.0, 1706.0, 1887.0, 1977.0]
=======时间:Mon Mar 16 11:30:22 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:42
最大值:1957.0
最小值:11.0
算术平均值:980.3571428571429
各分位数对应值:[930.0, 1595.4000000000003, 1807.2, 1906.8, 1957.0]
=======时间:Mon Mar 16 11:30:27 CST 2020,统计值如下=======
统计周期:5s
样本数据个数:49
最大值:1955.0
最小值:28.0
算术平均值:1044.3877551020407
各分位数对应值:[1039.5, 1676.6000000000001, 1836.0000000000002, 1936.65, 1955.0]
...
小细节:
- 完全随机的情况下,算术平均值和中位数(50%分位数值)是差不多的
- 最大容量是很有作用的(比如本例为50,所以最大样本数是50个)
得到这些统计数据,再结合一些工具,用图示的方式展示出来,不就是一个监控系统麽,当然这很简陋。。。
总结
关于netflix-statistics
统计库就介绍到这了,可以把它当做一个工具库,自己若要手写,或者去理解监控的原理的时候,它是一个很好的抓手,便于理解。