先改变自己的态度,才能改变人生的高度。
代码下载地址:https://github.com/f641385712/netflix-learning
前言
上篇文章介绍了桶计数器BucketedCounterStream
,它提出了桶的概念并且提供了事件到桶的归约操作,可以说为滑动窗口实现打好了基础。
当然,基于BucketedCounterStream
桶的实现并不要求必须是滑动窗口,比如你也可以是固定窗口,也可以是累计计算等。在Hystrix里,基于桶实现的一共有两种统计方式:
- 在滑动窗口内统计,子类是
BucketedRollingCounterStream
体系 - 持续累计统计:子类是
BucketedCumulativeCounterStream
体系
本文将聚焦于前者,先来看看在滑动窗口内是如何统计指标信息的~
正文
HystrixCommands
和HystrixObservableCommands
执行过程中,会产生执行的数据,这些数据对于观察调用的性能表现非常有用。产生数据后,Metrics
会根据不同维度进行统计,主要有以下两个维度:滑动窗口内统计、持续累计统计。
滑动窗口内统计 BucketedRollingCounterStream
它改进BucketedCounterStream
,以减少桶的数量。按照滑动窗口的大小对每个单元窗口产生的桶进行聚合,它是Hystrix中滑动窗口的抽象实现。
// 泛型上没有确定任何泛型
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
// 核心还是在构造函数上
// 它比父构造器多了一个参数:reduceBucket函数,意思是减少桶的数量
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> appendRawEventToBucket,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
this.sourceStream = bucketedStream // 数据流,每个对象代表单元窗口产生的桶
.window(numBuckets, 1) // 按照滑动窗口桶的个数进行桶的聚集
.flatMap(reduceWindowToSummary) // 将一系列的桶聚集成最后的数据对象
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share() // 共享。不同的订阅者看到的数据是一致的
.onBackpressureDrop(); // 被压流量控制,当消费者消费速度过慢时就丢弃数据,不进行积压
}
// 实现父类的方法
@Override
public Observable<Output> observe() {
return sourceStream;
}
// 非public方法
boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}
}
既然是减少桶的数量,那必然有reduce操作。构造函数后两个参数分别代表两个函数:
appendRawEventToBucket
:将事件流聚合成桶(Event -> Bucket)reduceBucket
:将桶聚合、合并成输出对象sourceStream
。
该抽象类实现了父类的observe()
方法,返回了一个 Observable 类型的发布者 sourceStream,供订阅者去消费。这个sourceStream
就是滑动窗口的终极形态,这里需要关心的是它是如何被“创建的”?
sourceStream
的“创建”:其实核心还是window
和flatMap
这两个操作。
window(numBuckets, 1)
:第二个参数skip=1 的意思就是按照步长为 1 在数据流中滑动,不断聚集对象,这不就是滑动窗口的真正实现么flatMap(reduceWindowToSummary)
:在window那步每个窗口已经形成,,下一步就是要对窗口进行聚合了- 需要注意的是,这里和父类不同,这里聚合没有用reduce,而是用的
scan skip(numBuckets)
的组合
- 需要注意的是,这里和父类不同,这里聚合没有用reduce,而是用的
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
此处scan skip(numBuckets)
的效果和reduce差不多,但是它有个优点就是当最后面的窗口大小不满 numBuckets数值的时候,使用scan方式也能保证数据不会缺失。
到此,滑动窗口聚合完毕。最终share出去的就是sourceStream
,它能被N多订阅者订阅使用滑动窗口里面的数据,若生产速度 > 消费速度会直接抛弃掉,所以消费速度尽量的快哈(订阅者越多消费速度肯定越慢哦,并且符合木桶理论)。
BucketedRollingCounterStream
提供了完整的滑动窗口统计的服务,想要使用滑动窗口来统计数据的话继承它即可,下面便来看看它的实现类们。
RollingCommandEventCounterStream
用于滑动统计服务调用时各个事件的次数。这里面它使用HystrixCommandProperties
的配置策略:
- 固定窗口的大小(毫秒值):
properties.metricsRollingStatisticalWindowInMilliseconds()
,默认值是10000,也就是10s - 窗口内的桶数:
properties.metricsRollingStatisticalWindowBuckets()
,默认是10个 - 一个桶的大小(毫秒值):counterMetricWindow / numCounterBuckets;(默认是100000 / 10 = 1000,也就是一个桶是1s)
// 监听的事件源是HystrixCommandCompletion,所以事件流是HystrixCommandCompletionStream.getInstance(commandKey)
// 一个桶是一个long[]:也就是说一个桶内装有多个指标数据(已经完成对应的时间有N多)
// 输出的也是这些指标数
public class RollingCommandEventCounterStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], long[]> {
// 每个HystrixCommandKey有且仅有一个RollingCommandEventCounterStream 实例
private static final ConcurrentMap<String, RollingCommandEventCounterStream> streams = new ConcurrentHashMap<String, RollingCommandEventCounterStream>();
// 它决定了一个空桶应该有多深
private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
// 静态方法,获取实例
public static RollingCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) { ... }
public static RollingCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
... // 缓存里有直接拿,缓存里木有就new一个
}
...
public static void reset() {
streams.clear();
}
@Override
long[] getEmptyBucketSummary() {
return new long[NUM_EVENT_TYPES];
}
...
}
它监听HystrixCommandCompletionStream
数据流,用long[]
来表示一个桶,桶内装有HystrixEventType
所有类型的时间的统计数,对应关系通过数组下标和事件枚举的ordinal()值来对应。
另外,还有两个重要的函数reduceCommandCompletion
和reduceBucket
均位于HystrixCommandMetrics
里:
reduceCommandCompletion函数:
// 最重要的是eventCounts.getCount()方法
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = (long[] initialCountArray, HystrixCommandCompletion execution) -> {
// 结果统计:发射数、fallabck数等
ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
// 遍历所有的事件类型,分别做处理
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
switch (eventType) {
case EXCEPTION_THROWN: break;
default: // 其它事件全部添加到桶里
initialCountArray[eventType.ordinal()] = eventCounts.getCount(eventType);
break;
}
}
return initialCountArray;
}
该函数用于把事件合并到桶内(该桶可能已经有值了哦),最重要的是eventCounts.getCount(eventType)
方法决定了如何添加:
EMIT
:返回EventCounts.numEmissions(每次command命令内可能多次发射)FALLBACK_EMIT
:返回numFallbackEmissionsEXCEPTION_THROWN
:- 其它事件:包含了就 1,否则忽略
reduceBucket():
public static final Func2<long[], long[], long[]> bucketAggregator = (long[] cumulativeEvents, long[] bucketEventCounts) -> {
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
switch (eventType) {
case EXCEPTION_THROWN:
for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
cumulativeEvents[eventType.ordinal()] = bucketEventCounts[exceptionEventType.ordinal()];
}
break;
default:
cumulativeEvents[eventType.ordinal()] = bucketEventCounts[eventType.ordinal()];
break;
}
}
return cumulativeEvents;
}
这个函数的逻辑:就是把多个桶里面的数据合并到一个桶里。
现实意义是:一个时间窗口内有多个桶,但是希望对外呈现的是一个桶,表示这一个窗口的整体数据情况
合并的规则很清晰:
HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES
所有类型(包括:BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION
)全部归为EXCEPTION_THROWN
,也就是所谓的异常类型- 其它类型该是什么类型就合并到啥类型里去
这么一来,对外呈现的永远是是一个桶。其实外部也只关心这个阈值而已,时间窗口是你的实现细节,上层并不关心。
RollingThreadPoolEventCounterStream
它监听的同样是RollingThreadPoolEventCounterStream
这个数据流,实现逻辑完全同上,不一样的是它关心的事件不一样,具体参见两大函数的实现,均位HystrixThreadPoolMetrics
里。
线程关心的事件枚举是:HystrixEventType.ThreadPool
,只有两个值EXECUTED
和REJECTED
,只有事件类型是HystrixEventType.THREAD_POOL_REJECTED
才算REJECTED
~
HealthCountsStream(重要)
它提供实时的健康检查数据(HystrixCommandMetrics.HealthCounts
,统计调用成功和失败的计数,以及比率)。它继承自BucketedRollingCounterStream
,三个泛型参数是:
Event -> HystrixCommandCompletion
:代表命令执行完成。可以从中获取执行结果,并从中提取所有产生的事件(HystrixEventType)Bucket -> long[]
:里面统计了各种事件的个数。其中 index 为事件类型枚举对应的索引(ordinal),值为对应事件的个数Output -> HystrixCommandMetrics.HealthCounts
:里面统计了总的执行次数、失败次数以及失败百分比,供熔断器使用
注意:这里的
Bucket
和Output
不一样了哦,并不是“原样输出”了~
先简要了解下HystrixCommandMetrics.HealthCounts
吧
HealthCounts
这个类记录着滑动窗口期间的请求数,包括:总数、失败数、失败百分比。
代码语言:javascript复制public static class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
...
// 增加次数。返回一个新的HealthCounts实例哦~所以它是线程安全的
public HealthCounts plus(long[] eventTypeCounts) { ... }
// 空的 属性均是0
public static HealthCounts empty() { return EMPTY }
...
}
参与加法的事件包括:
HystrixEventType.SUCCESS
对应HystrixRollingNumberEvent.SUCCESS
- HystrixEventType.FAILURE 对应 …
- HystrixEventType.TIMEOUT 对应 …
- HystrixEventType.THREAD_POOL_REJECTED 对应 …
- HystrixEventType.SEMAPHORE_REJECTED 对应 …
这5个源生事件均对应着
HystrixRollingNumberEvent
的同名事件类型
除了1
表示成功外,其它均属于失败事件,会计入失败总数和失败百分比里。
同样的,HealthCountsStream
的源码逻辑完全同上,所以也仅需看看它的归约函数的实现即可:
reduceCommandCompletion:使用的HystrixCommandMetrics.appendEventToBucket,上面已经介绍
reduceBucket函数:
(healthCounts, bucketEventCounts) -> healthCounts.plus(bucketEventCounts);
该实现也非常简单:相加即可。当然哪些事件算成功,哪些算失败这在healthCounts内,上面已经做了表述。
需要注意的是:它会每500ms去获取一下当先的统计数据,而这个参数是由properties.metricsHealthSnapshotIntervalInMilliseconds()
来决定的,默认值是500ms。
Hystrix熔断器(HystrixCircuitBreakerImpl
)里会实时地去消费每个窗口产生的健康统计数据,并根据指标来决定熔断器的状态。
RollingCollapserEventCounterStream
实现逻辑完全同上,不同的是它关心的事件流是HystrixCollapserEventStream
,关心的事件是HystrixEventType.Collapser
。
public enum Collapser {
BATCH_EXECUTED, ADDED_TO_BATCH, RESPONSE_FROM_CACHE;
}
总结
关于Netflix Hystrix在滑动窗口内统计就介绍到这了,到此我相信你已经完全能够理解Hystrix
是如何使用滑动窗口统计数据了吧,甚至你还可以对应Hystrix的监控大盘,是不是结构上和这一模一样呢,这就是原理。掌握了原理在应用,脚跟更稳了。
Hystrix
自1.5起使用 RxJava 1.x
(注意目前最新是2.x) 来实现滑动窗口,将滑动窗口抽象成响应式数据流的形式,既适合 Hystrix 事件驱动的特点,又易于实现和使用。滑动窗口的实现的要点就是每个桶的聚合以及滑动窗口的形成,Hystrix 巧妙地运用了 RxJava 中的 window 操作符来将单位窗口时间内的事件,以及将一个窗口大小内的桶聚集到一起,并通过 reduce 等归约折叠操作将事件集合聚集为桶,将滑动窗口内的桶聚集成指标数据(可能还是个桶,也可能是计算过后的如HealthCounts),非常巧妙。