[享学Netflix] 二十三、Hystrix桶计数器:BucketedCounterStream

2020-03-19 10:44:11 浏览数 (1)

入一行,先别惦记着挣钱,而是要先让自己值钱

代码下载地址:https://github.com/f641385712/netflix-learning

前言

上篇文章介绍了Hystrix指标数据收集的数据源HystrixEvent和数据流HystrixEventStream,通过示例了解了Hystrix数据收集的基本模型,但实际上并不精确。

我们已经知道Hystrix它是通过滑动窗口的数据结构/算法来统计调用的指标数据的,但若直接使用HystrixEventStream作为管道传播数据的话,是点对点的,并无时间区间、时间窗口等概念。因此本文将以以这为目的,深入了解与时间窗口相关的数据传输、收集处理的核心API:BucketedCounterStream


正文

Hystrix1.5版本(2016.12)开始,它全面拥抱RxJava把这块代码重写设计为了基于数据流Stream的形式,通过消费数据流的形式利用滑动窗口,并对数据流进行变换后进行后续的操作,可以让开发者更加灵活地去使用。

滑动窗口本质就是不断变换的数据流,滑动窗口中每个桶的数据都来自于源源不断的事件,因此滑动窗口非常适合用观察者模式和响应式编程思想的 RxJava 实现。

说明数据流Stream的实现强依赖与RxJava思想,推荐若对此还不太熟悉,请翻阅前几篇文章or其它文章先了解RxJava的思想以及使用。

使用 RxJava可以通过它的一系列操作符来实现滑动窗口,从而可以依赖 RxJava 的线程模型来保证数据写入和聚合的线程安全,将这一系列的机制交给 RxJava来得以保证。所有的操作都是在 RxJava 的后台线程上进行的,这也大大降低了对业务线程的延迟性的影响。


Hystrix里的滑动窗口

Hystrix通过滑动窗口来对数据进行“平滑”统计,默认情况下,一个滑动窗口包含10个桶(Bucket),每个桶时间宽度是1秒,负责1秒的数据统计。滑动窗口包含的总时间以及其中的桶数量都是可以配置的,来张官方的截图认识下滑动窗口:

上图的每个小矩形代表一个桶,可以看到,每个桶都记录着1秒内的四个指标数据:成功量、失败量、超时量和拒绝量,这里的拒绝量指的就是上面流程图中【信号量/线程池资源检查】中被拒绝的流量。10个桶合起来是一个完整的滑动窗口,所以计算一个滑动窗口的总数据需要将10个桶的数据加起来。


BucketedCounterStream

滑动窗口所有的数据流实现均位于com.netflix.hystrix.metric.consumer这个包下,这里先挑最顶层的类BucketedCounterStream进行说明。

BucketedCounterStream它是抽象类,提供了基本的桶计数器(BucketedCounter)实现:按配置的时间间隔将所有事件聚合成桶

该抽象类定义最为基本的概念:桶、窗口

代码语言:javascript复制
// Event:需要汇聚到桶里面的原始事件类型(HystrixEvent是原始的,HystrixRollingNumberEvent是直接的)
	// Hystrix 中的调用事件,如命令开始执行、命令执行完成等
// Bucket:每个桶中包含的数据类型
// Output:最终输出类型:发送给流订阅者的数据类型(通常与Bucket相同,但不必相同)
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {

    protected final int numBuckets;
    protected final Observable<Bucket> bucketedStream;
    // 订阅信息:允许订阅or取消订阅
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

	// 它是一个函数。用于把Observable<Event>转为Observable<Bucket>
	private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;
	// 它是个Subject:既能发射数据,也能监听数据
	// 用于计数
	private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());


	// inputEventStream:事件流,input输入。比如command执行开始、结束时都会有输入
	// numBuckets:用户不配置的话,默认它是10
	// bucketSizeInMs:窗口毫秒值。若不配置回事1秒
	// appendRawEventToBucket:它是一个函数 R call(T1 t1, T2 t2) 输入Bucket, Event返回Bucket类型
    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {

	    this.numBuckets = numBuckets;

		// getEmptyBucketSummary是否抽象方法:获取空桶
	    this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
	    final List<Bucket> emptyEventCountsToStart = new ArrayList<>();
	    for (int i = 0; i < numBuckets; i  ) {
	        emptyEventCountsToStart.add(getEmptyBucketSummary());
	    }
	    this.bucketedStream = Observable.defer(() -> {
	        return inputEventStream
	                .observe()
	                // 利用RxJava进行窗口滑动
	                // bucketSizeInMs默认值是1000,表示1s表示一个窗口
	                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) 
	                .flatMap(reduceBucketToSummary) 
	                .startWith(emptyEventCountsToStart);
	    });
	
	}

}

用户在使用 Hystrix 的时候一般都要配两个值(当然,大多数情况下默认值即可):timeInMillisecondsnumBuckets,前者代表滑动窗口的长度(时间间隔),后者代表滑动窗口中桶的个数,那么每个桶对应的窗口长度就是 bucketSizeInMs = timeInMilliseconds / numBuckets(记为一个单元窗口周期)。BucketedCounterStream每隔一个单元窗口周期(bucketSizeInMs)就把这段时间内的所有调用事件聚合到一个桶内(使用的便是reduceBucketToSummary函数完成)。


共享的事件流HystrixEventStream

BucketedCounterStream 核心代码在构造函数里,里面最核心的逻辑就是如何将一个一个的事件按一段时间(RxJava的window方法)聚合成一个桶(flatMap方法)。我们可以看到 bucketedStream 是经事件源 inputEventStream 变换而成的,事件源的类型为 HystrixEventStream<Event>,关于此事件流你可参考上篇文章,电梯直达:[享学Netflix] 二十二、Netflix Hystrix事件源与事件流:HystrixEvent和HystrixEventStream

此处说明一点:发送事件/数据的顺序性、write()数据时的线程安全性均由RxJava以及Hystrix使用ThreadLocal提供保证的,使用者放心使用即可


事件聚合 -> 桶(Event -> Bucket)

事件流通过HystrixEventStream源源不断的传递过来,某一时段甚至某一时刻进来的事件会有N个,但是这个时候需要把它聚合成Bucket桶,以方便后续的统计(因为桶才是窗口的最小单位),这部分核心逻辑在这:

代码语言:javascript复制
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);

this.bucketedStream = Observable.defer(() -> { // defer 的意思是 lazy 创建
    return inputEventStream
            .observe()
            .window(bucketSizeInMs, TimeUnit.MILLISECONDS) // 按单元窗口长度来将某个时间段内的调用事件聚集起来
            .flatMap(reduceBucketToSummary)                // 将每个单元窗口内聚集起来的事件集合聚合成桶
            .startWith(emptyEventCountsToStart);           // 为了保证窗口的完整性,开始的时候先产生一串空的桶
});

这里最为核心是 window 操作符:它可以按单元窗口长度来将某个时间段内的调用事件聚集起来。 此时数据流里每个对象都是一个集合:Observable<Event>,所以需要将其聚集成桶类型以将其扁平化。Hystrix 通过 RxJava 的 reduce 操作符进行“归纳”操作,将一串事件归纳成一个桶:

代码语言:javascript复制
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);

这个reduce函数的初始值为:getEmptyBucketSummary()也就是空桶,它是抽象方法由子类实现。appendRawEventToBucket负责具体的reduce聚合逻辑,这是由构造函数传进来的函数:Bucket Event -> Bucket,表示:对于每个 Event,都将其聚合到 Bucket 中,并返回聚合后的 Bucket。

说明:不同的实现对归约appendRawEventToBucket函数的实现是不同的,比如熔断器依赖的HealthCountsStream它就是以long[]作为每个桶的。

Tips:window(timespan, unit)操作符属于计算型操作符,默认会在 Schedulers.computation() 调度器下执行(CPU 密集型,关于Schedulers前文有过详细解释),其底层本质是线程数为 CPU 核数的线程池。RxJava 会确保其线程安全。


其它方法
代码语言:javascript复制
BucketedCounterStream:

	// 抽象方法:访问权限是Default哦~~~
    abstract Bucket getEmptyBucketSummary(); // 空桶
    abstract Output getEmptyOutputValue(); // 空的输出值。作为BehaviorSubject的默认值

	// 注意:这个泛型是output,并不是输入哦。返回的是处理后的输出流,所以一般是桶
	// 它是public的
    public abstract Observable<Output> observe();

	// 取消subscription的订阅(它的设值方法见下)
    public void unsubscribe() {
        Subscription s = subscription.get();
        if (s != null) {
            s.unsubscribe();
            subscription.compareAndSet(s, null);
        }
    }
    // 若subscription还为null(还未开始),那就让counterSubject去监听着
    // observe().subscribe(counterSubject);
    public void startCachingStreamValuesIfUnstarted() { ... }

	// 这是一个同步调用。以检索最后一个计算的桶,而不需要等待任何发射
	// 该方法会在很多地方被调用
    public Output getLatest() {
        startCachingStreamValuesIfUnstarted();
        if (counterSubject.hasValue()) {
            return counterSubject.getValue();
        } else {
            return getEmptyOutputValue();
        }
    }

总结

BucketedCounterStream提供的能力可描述为:桶计数器,它负责把一段时间窗口内的事件归约到一个桶里,并且对外提供Stream的访问方式,让外部可以订阅、处理。

如果说HystrixEventStream是点对点的建立了通道,那么BucketedCounterStream就是定期的去通道了收集数据,统计装到桶里,以便后续使用。至于桶是什么结构?装了哪些数据,以及具体的归约、计算逻辑均在子类实现,下面文章将继续分享这方面的内容,敬请关注。

0 人点赞