入一行,先别惦记着挣钱,而是要先让自己值钱
代码下载地址:https://github.com/f641385712/netflix-learning
前言
上篇文章介绍了Hystrix
指标数据收集的数据源HystrixEvent
和数据流HystrixEventStream
,通过示例了解了Hystrix数据收集的基本模型,但实际上并不精确。
我们已经知道Hystrix
它是通过滑动窗口的数据结构/算法来统计调用的指标数据的,但若直接使用HystrixEventStream
作为管道传播数据的话,是点对点的,并无时间区间、时间窗口等概念。因此本文将以以这为目的,深入了解与时间窗口相关的数据传输、收集处理的核心API:BucketedCounterStream
。
正文
Hystrix
从1.5
版本(2016.12)开始,它全面拥抱RxJava
把这块代码重写设计为了基于数据流Stream的形式,通过消费数据流的形式利用滑动窗口,并对数据流进行变换后进行后续的操作,可以让开发者更加灵活地去使用。
滑动窗口本质就是不断变换的数据流,滑动窗口中每个桶的数据都来自于源源不断的事件,因此滑动窗口非常适合用观察者模式和响应式编程思想的 RxJava 实现。
说明数据流Stream的实现强依赖与RxJava思想,推荐若对此还不太熟悉,请翻阅前几篇文章or其它文章先了解RxJava的思想以及使用。
使用 RxJava可以通过它的一系列操作符来实现滑动窗口,从而可以依赖 RxJava 的线程模型来保证数据写入和聚合的线程安全,将这一系列的机制交给 RxJava来得以保证。所有的操作都是在 RxJava 的后台线程上进行的,这也大大降低了对业务线程的延迟性的影响。
Hystrix里的滑动窗口
Hystrix
通过滑动窗口来对数据进行“平滑”统计,默认情况下,一个滑动窗口包含10个桶(Bucket),每个桶时间宽度是1秒,负责1秒的数据统计。滑动窗口包含的总时间以及其中的桶数量都是可以配置的,来张官方的截图认识下滑动窗口:
上图的每个小矩形代表一个桶,可以看到,每个桶都记录着1秒内的四个指标数据:成功量、失败量、超时量和拒绝量,这里的拒绝量指的就是上面流程图中【信号量/线程池资源检查】中被拒绝的流量。10个桶合起来是一个完整的滑动窗口,所以计算一个滑动窗口的总数据需要将10个桶的数据加起来。
BucketedCounterStream
滑动窗口所有的数据流实现均位于com.netflix.hystrix.metric.consumer
这个包下,这里先挑最顶层的类BucketedCounterStream
进行说明。
BucketedCounterStream
它是抽象类,提供了基本的桶计数器(BucketedCounter)实现:按配置的时间间隔将所有事件聚合成桶。
代码语言:javascript复制该抽象类定义最为基本的概念:桶、窗口
// Event:需要汇聚到桶里面的原始事件类型(HystrixEvent是原始的,HystrixRollingNumberEvent是直接的)
// Hystrix 中的调用事件,如命令开始执行、命令执行完成等
// Bucket:每个桶中包含的数据类型
// Output:最终输出类型:发送给流订阅者的数据类型(通常与Bucket相同,但不必相同)
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
protected final int numBuckets;
protected final Observable<Bucket> bucketedStream;
// 订阅信息:允许订阅or取消订阅
protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);
// 它是一个函数。用于把Observable<Event>转为Observable<Bucket>
private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;
// 它是个Subject:既能发射数据,也能监听数据
// 用于计数
private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());
// inputEventStream:事件流,input输入。比如command执行开始、结束时都会有输入
// numBuckets:用户不配置的话,默认它是10
// bucketSizeInMs:窗口毫秒值。若不配置回事1秒
// appendRawEventToBucket:它是一个函数 R call(T1 t1, T2 t2) 输入Bucket, Event返回Bucket类型
protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
this.numBuckets = numBuckets;
// getEmptyBucketSummary是否抽象方法:获取空桶
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
final List<Bucket> emptyEventCountsToStart = new ArrayList<>();
for (int i = 0; i < numBuckets; i ) {
emptyEventCountsToStart.add(getEmptyBucketSummary());
}
this.bucketedStream = Observable.defer(() -> {
return inputEventStream
.observe()
// 利用RxJava进行窗口滑动
// bucketSizeInMs默认值是1000,表示1s表示一个窗口
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceBucketToSummary)
.startWith(emptyEventCountsToStart);
});
}
}
用户在使用 Hystrix 的时候一般都要配两个值(当然,大多数情况下默认值即可):timeInMilliseconds
和numBuckets
,前者代表滑动窗口的长度(时间间隔),后者代表滑动窗口中桶的个数,那么每个桶对应的窗口长度就是 bucketSizeInMs = timeInMilliseconds / numBuckets
(记为一个单元窗口周期)。BucketedCounterStream
每隔一个单元窗口周期(bucketSizeInMs
)就把这段时间内的所有调用事件聚合到一个桶内(使用的便是reduceBucketToSummary
函数完成)。
共享的事件流HystrixEventStream
BucketedCounterStream 核心代码在构造函数里,里面最核心的逻辑就是如何将一个一个的事件按一段时间(RxJava的window方法)聚合成一个桶(flatMap方法)。我们可以看到 bucketedStream 是经事件源 inputEventStream 变换而成的,事件源的类型为 HystrixEventStream<Event>
,关于此事件流你可参考上篇文章,电梯直达:[享学Netflix] 二十二、Netflix Hystrix事件源与事件流:HystrixEvent和HystrixEventStream
此处说明一点:发送事件/数据的顺序性、
write()
数据时的线程安全性均由RxJava以及Hystrix使用ThreadLocal
提供保证的,使用者放心使用即可
事件聚合 -> 桶(Event -> Bucket)
事件流通过HystrixEventStream
源源不断的传递过来,某一时段甚至某一时刻进来的事件会有N个,但是这个时候需要把它聚合成Bucket
桶,以方便后续的统计(因为桶才是窗口的最小单位),这部分核心逻辑在这:
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
this.bucketedStream = Observable.defer(() -> { // defer 的意思是 lazy 创建
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) // 按单元窗口长度来将某个时间段内的调用事件聚集起来
.flatMap(reduceBucketToSummary) // 将每个单元窗口内聚集起来的事件集合聚合成桶
.startWith(emptyEventCountsToStart); // 为了保证窗口的完整性,开始的时候先产生一串空的桶
});
这里最为核心是 window 操作符:它可以按单元窗口长度来将某个时间段内的调用事件聚集起来。
此时数据流里每个对象都是一个集合:Observable<Event>
,所以需要将其聚集成桶类型以将其扁平化。Hystrix 通过 RxJava 的 reduce 操作符进行“归纳”操作,将一串事件归纳成一个桶:
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
这个reduce函数的初始值为:getEmptyBucketSummary()
也就是空桶,它是抽象方法由子类实现。appendRawEventToBucket
负责具体的reduce聚合逻辑,这是由构造函数传进来的函数:Bucket Event -> Bucket
,表示:对于每个 Event,都将其聚合到 Bucket 中,并返回聚合后的 Bucket。
说明:不同的实现对归约
appendRawEventToBucket
函数的实现是不同的,比如熔断器依赖的HealthCountsStream
它就是以long[]
作为每个桶的。
Tips:window(timespan, unit)
操作符属于计算型操作符,默认会在 Schedulers.computation() 调度器下执行(CPU 密集型,关于Schedulers前文有过详细解释),其底层本质是线程数为 CPU 核数的线程池。RxJava 会确保其线程安全。
其它方法
代码语言:javascript复制BucketedCounterStream:
// 抽象方法:访问权限是Default哦~~~
abstract Bucket getEmptyBucketSummary(); // 空桶
abstract Output getEmptyOutputValue(); // 空的输出值。作为BehaviorSubject的默认值
// 注意:这个泛型是output,并不是输入哦。返回的是处理后的输出流,所以一般是桶
// 它是public的
public abstract Observable<Output> observe();
// 取消subscription的订阅(它的设值方法见下)
public void unsubscribe() {
Subscription s = subscription.get();
if (s != null) {
s.unsubscribe();
subscription.compareAndSet(s, null);
}
}
// 若subscription还为null(还未开始),那就让counterSubject去监听着
// observe().subscribe(counterSubject);
public void startCachingStreamValuesIfUnstarted() { ... }
// 这是一个同步调用。以检索最后一个计算的桶,而不需要等待任何发射
// 该方法会在很多地方被调用
public Output getLatest() {
startCachingStreamValuesIfUnstarted();
if (counterSubject.hasValue()) {
return counterSubject.getValue();
} else {
return getEmptyOutputValue();
}
}
总结
BucketedCounterStream
提供的能力可描述为:桶计数器,它负责把一段时间窗口内的事件归约到一个桶里,并且对外提供Stream
的访问方式,让外部可以订阅、处理。
如果说HystrixEventStream
是点对点的建立了通道,那么BucketedCounterStream
就是定期的去通道了收集数据,统计装到桶里,以便后续使用。至于桶是什么结构?装了哪些数据,以及具体的归约、计算逻辑均在子类实现,下面文章将继续分享这方面的内容,敬请关注。