[享学Netflix] 二十四、Hystrix在滑动窗口内统计:BucketedRollingCounterStream、HealthCountsStream

2020-03-19 10:10:36 浏览数 (1)

先改变自己的态度,才能改变人生的高度。

代码下载地址:https://github.com/f641385712/netflix-learning

前言

上篇文章介绍了桶计数器BucketedCounterStream,它提出了桶的概念并且提供了事件到桶的归约操作,可以说为滑动窗口实现打好了基础。

当然,基于BucketedCounterStream桶的实现并不要求必须是滑动窗口,比如你也可以是固定窗口,也可以是累计计算等。在Hystrix里,基于桶实现的一共有两种统计方式:

  • 在滑动窗口内统计,子类是BucketedRollingCounterStream体系
  • 持续累计统计:子类是BucketedCumulativeCounterStream体系

本文将聚焦于前者,先来看看在滑动窗口内是如何统计指标信息的~


正文

HystrixCommandsHystrixObservableCommands执行过程中,会产生执行的数据,这些数据对于观察调用的性能表现非常有用。产生数据后,Metrics会根据不同维度进行统计,主要有以下两个维度:滑动窗口内统计、持续累计统计。


滑动窗口内统计 BucketedRollingCounterStream

它改进BucketedCounterStream,以减少桶的数量。按照滑动窗口的大小每个单元窗口产生的桶进行聚合,它是Hystrix中滑动窗口的抽象实现

代码语言:javascript复制
// 泛型上没有确定任何泛型
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {

    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
	
	// 核心还是在构造函数上
	// 它比父构造器多了一个参数:reduceBucket函数,意思是减少桶的数量
    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                           Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           Func2<Output, Bucket, Output> reduceBucket) {
                                           
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        this.sourceStream = bucketedStream      // 数据流,每个对象代表单元窗口产生的桶
                .window(numBuckets, 1)          // 按照滑动窗口桶的个数进行桶的聚集
                .flatMap(reduceWindowToSummary) // 将一系列的桶聚集成最后的数据对象
                .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
                .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
                .share()                        // 共享。不同的订阅者看到的数据是一致的
                .onBackpressureDrop();          // 被压流量控制,当消费者消费速度过慢时就丢弃数据,不进行积压
    }

	// 实现父类的方法
    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
    // 非public方法
	boolean isSourceCurrentlySubscribed() {
        return isSourceCurrentlySubscribed.get();
    }

}

既然是减少桶的数量,那必然有reduce操作。构造函数后两个参数分别代表两个函数:

  • appendRawEventToBucket:将事件流聚合成桶(Event -> Bucket)
  • reduceBucket:将桶聚合、合并成输出对象sourceStream

该抽象类实现了父类的observe()方法,返回了一个 Observable 类型的发布者 sourceStream,供订阅者去消费。这个sourceStream就是滑动窗口的终极形态,这里需要关心的是它是如何被“创建的”?

sourceStream的“创建”:其实核心还是windowflatMap这两个操作。

  • window(numBuckets, 1):第二个参数skip=1 的意思就是按照步长为 1 在数据流中滑动,不断聚集对象,这不就是滑动窗口的真正实现么
  • flatMap(reduceWindowToSummary):在window那步每个窗口已经形成,,下一步就是要对窗口进行聚合了
    • 需要注意的是,这里和父类不同,这里聚合没有用reduce,而是用的scan skip(numBuckets) 的组合
代码语言:javascript复制
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);	

此处scan skip(numBuckets)的效果和reduce差不多,但是它有个优点就是当最后面的窗口大小不满 numBuckets数值的时候,使用scan方式也能保证数据不会缺失。

到此,滑动窗口聚合完毕。最终share出去的就是sourceStream,它能被N多订阅者订阅使用滑动窗口里面的数据,若生产速度 > 消费速度会直接抛弃掉,所以消费速度尽量的快哈(订阅者越多消费速度肯定越慢哦,并且符合木桶理论)。

BucketedRollingCounterStream提供了完整的滑动窗口统计的服务,想要使用滑动窗口来统计数据的话继承它即可,下面便来看看它的实现类们。


RollingCommandEventCounterStream

用于滑动统计服务调用时各个事件的次数。这里面它使用HystrixCommandProperties的配置策略:

  • 固定窗口的大小(毫秒值):properties.metricsRollingStatisticalWindowInMilliseconds(),默认值是10000,也就是10s
  • 窗口内的桶数:properties.metricsRollingStatisticalWindowBuckets(),默认是10个
  • 一个桶的大小(毫秒值):counterMetricWindow / numCounterBuckets;(默认是100000 / 10 = 1000,也就是一个桶是1s)
代码语言:javascript复制
// 监听的事件源是HystrixCommandCompletion,所以事件流是HystrixCommandCompletionStream.getInstance(commandKey)
// 一个桶是一个long[]:也就是说一个桶内装有多个指标数据(已经完成对应的时间有N多)
// 输出的也是这些指标数
public class RollingCommandEventCounterStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], long[]> {

	// 每个HystrixCommandKey有且仅有一个RollingCommandEventCounterStream 实例
    private static final ConcurrentMap<String, RollingCommandEventCounterStream> streams = new ConcurrentHashMap<String, RollingCommandEventCounterStream>();

	// 它决定了一个空桶应该有多深
	private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;

	// 静态方法,获取实例
	public static RollingCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) { ... }
	public static RollingCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
		... // 缓存里有直接拿,缓存里木有就new一个
	}
	...
    public static void reset() {
        streams.clear();
    }

    @Override
    long[] getEmptyBucketSummary() {
        return new long[NUM_EVENT_TYPES];
    }
    ...
}

它监听HystrixCommandCompletionStream数据流,用long[]来表示一个桶,桶内装有HystrixEventType所有类型的时间的统计数,对应关系通过数组下标和事件枚举的ordinal()值来对应。 另外,还有两个重要的函数reduceCommandCompletionreduceBucket均位于HystrixCommandMetrics里:

代码语言:javascript复制
reduceCommandCompletion函数:

		// 最重要的是eventCounts.getCount()方法
		public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = (long[] initialCountArray, HystrixCommandCompletion execution) -> {
			// 结果统计:发射数、fallabck数等
			ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
		
			// 遍历所有的事件类型,分别做处理
            for (HystrixEventType eventType: ALL_EVENT_TYPES) {
                switch (eventType) {
                    case EXCEPTION_THROWN: break;
                    default: // 其它事件全部添加到桶里
                        initialCountArray[eventType.ordinal()]  = eventCounts.getCount(eventType);
                        break;
                }
            }
            return initialCountArray;
		}

该函数用于把事件合并到桶内(该桶可能已经有值了哦),最重要的是eventCounts.getCount(eventType)方法决定了如何添加:

  • EMIT:返回EventCounts.numEmissions(每次command命令内可能多次发射)
  • FALLBACK_EMIT:返回numFallbackEmissions
  • EXCEPTION_THROWN
  • 其它事件:包含了就 1,否则忽略
代码语言:javascript复制
reduceBucket():

	public static final Func2<long[], long[], long[]> bucketAggregator = (long[] cumulativeEvents, long[] bucketEventCounts) -> {
            for (HystrixEventType eventType: ALL_EVENT_TYPES) {
                switch (eventType) {
                    case EXCEPTION_THROWN:
                        for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                            cumulativeEvents[eventType.ordinal()]  = bucketEventCounts[exceptionEventType.ordinal()];
                        }
                        break;
                    default:
                        cumulativeEvents[eventType.ordinal()]  = bucketEventCounts[eventType.ordinal()];
                        break;
                }
            }
            return cumulativeEvents;
	}

这个函数的逻辑:就是把多个桶里面的数据合并到一个桶里。

现实意义是:一个时间窗口内有多个桶,但是希望对外呈现的是一个桶,表示这一个窗口的整体数据情况

合并的规则很清晰:

  • HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES所有类型(包括:BAD_REQUEST、FALLBACK_FAILURE、FALLBACK_MISSING、FALLBACK_REJECTION)全部归为EXCEPTION_THROWN,也就是所谓的异常类型
  • 其它类型该是什么类型就合并到啥类型里去

这么一来,对外呈现的永远是是一个桶。其实外部也只关心这个阈值而已,时间窗口是你的实现细节,上层并不关心。


RollingThreadPoolEventCounterStream

它监听的同样是RollingThreadPoolEventCounterStream这个数据流,实现逻辑完全同上,不一样的是它关心的事件不一样,具体参见两大函数的实现,均位HystrixThreadPoolMetrics里。

线程关心的事件枚举是:HystrixEventType.ThreadPool,只有两个值EXECUTEDREJECTED,只有事件类型是HystrixEventType.THREAD_POOL_REJECTED才算REJECTED~


HealthCountsStream(重要)

它提供实时的健康检查数据(HystrixCommandMetrics.HealthCounts,统计调用成功和失败的计数,以及比率)。它继承自BucketedRollingCounterStream,三个泛型参数是:

  • Event -> HystrixCommandCompletion:代表命令执行完成。可以从中获取执行结果,并从中提取所有产生的事件(HystrixEventType)
  • Bucket -> long[]:里面统计了各种事件的个数。其中 index 为事件类型枚举对应的索引(ordinal),值为对应事件的个数
  • Output -> HystrixCommandMetrics.HealthCounts:里面统计了总的执行次数、失败次数以及失败百分比,供熔断器使用

注意:这里的BucketOutput不一样了哦,并不是“原样输出”了~

先简要了解下HystrixCommandMetrics.HealthCounts


HealthCounts

这个类记录着滑动窗口期间的请求数,包括:总数、失败数、失败百分比

代码语言:javascript复制
public static class HealthCounts {
    private final long totalCount;
    private final long errorCount;
    private final int errorPercentage;
    ...
    // 增加次数。返回一个新的HealthCounts实例哦~所以它是线程安全的
	public HealthCounts plus(long[] eventTypeCounts) { ... }
	// 空的  属性均是0
	public static HealthCounts empty() { return EMPTY } 
	...
}

参与加法的事件包括:

  1. HystrixEventType.SUCCESS 对应 HystrixRollingNumberEvent.SUCCESS
  2. HystrixEventType.FAILURE 对应 …
  3. HystrixEventType.TIMEOUT 对应 …
  4. HystrixEventType.THREAD_POOL_REJECTED 对应 …
  5. HystrixEventType.SEMAPHORE_REJECTED 对应 …

这5个源生事件均对应着HystrixRollingNumberEvent同名事件类型

除了1表示成功外,其它均属于失败事件,会计入失败总数和失败百分比里


同样的,HealthCountsStream的源码逻辑完全同上,所以也仅需看看它的归约函数的实现即可:

代码语言:javascript复制
reduceCommandCompletion:使用的HystrixCommandMetrics.appendEventToBucket,上面已经介绍
	
reduceBucket函数:
	
	(healthCounts, bucketEventCounts) ->  healthCounts.plus(bucketEventCounts);

该实现也非常简单:相加即可。当然哪些事件算成功,哪些算失败这在healthCounts内,上面已经做了表述。 需要注意的是:它会每500ms去获取一下当先的统计数据,而这个参数是由properties.metricsHealthSnapshotIntervalInMilliseconds()来决定的,默认值是500ms。

Hystrix熔断器(HystrixCircuitBreakerImpl)里会实时地去消费每个窗口产生的健康统计数据,并根据指标来决定熔断器的状态。


RollingCollapserEventCounterStream

实现逻辑完全同上,不同的是它关心的事件流是HystrixCollapserEventStream,关心的事件是HystrixEventType.Collapser

代码语言:javascript复制
    public enum Collapser {
        BATCH_EXECUTED, ADDED_TO_BATCH, RESPONSE_FROM_CACHE;
    }

总结

关于Netflix Hystrix在滑动窗口内统计就介绍到这了,到此我相信你已经完全能够理解Hystrix是如何使用滑动窗口统计数据了吧,甚至你还可以对应Hystrix的监控大盘,是不是结构上和这一模一样呢,这就是原理。掌握了原理在应用,脚跟更稳了。

Hystrix自1.5起使用 RxJava 1.x(注意目前最新是2.x) 来实现滑动窗口,将滑动窗口抽象成响应式数据流的形式,既适合 Hystrix 事件驱动的特点,又易于实现和使用。滑动窗口的实现的要点就是每个桶的聚合以及滑动窗口的形成,Hystrix 巧妙地运用了 RxJava 中的 window 操作符来将单位窗口时间内的事件,以及将一个窗口大小内的桶聚集到一起,并通过 reduce 等归约折叠操作将事件集合聚集为桶将滑动窗口内的桶聚集成指标数据(可能还是个桶,也可能是计算过后的如HealthCounts),非常巧妙。

0 人点赞