[享学Netflix] 二十六、Hystrix指标数据收集器:HystrixMetrics(HystrixDashboard的数据来源)

2020-03-19 10:52:43 浏览数 (1)

看一个人的成功,不是看他赢了多少人,而是看他成就了多少人。

代码下载地址:https://github.com/f641385712/netflix-learning

前言

前面已经花了5篇文章专门介绍了Hystrix基于事件机制的数据收集、Stream流式处理,再回头来理解它的HystrixMetrics指标收集就一清二楚了。

HystrixCommand执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。 Hystrix以command命令模式的方式来控制业务逻辑以及熔断逻辑的调用时机,所以说数据统计对它来说不算难事,但如何高效、精准的在内存中统计数据,还需要一定的技巧。

需要提前说明的是:像什么hystrix.streamHystrixDashboard面板查看等这些,本文均还不会体现。本文只阐述数据的采集,至于数据如何使用(存储or展示)放在后几篇文章。


正文

Hystrix收集数据是必不可少的一步,每个降级点(需要采取降级保护的点)的数据是独立的,所以我们可以给每个降级点配置单独的策略

这些策略一般是建立在我们对这些降级点的了解之上的,初期甚至可以先观察一下采集的数据来指定降级策略。

采集哪些数据?数据如何存储?数据如何上报?这都是Hystrix需要考虑的问题,Hystrix采用的是滑动窗口 分桶的形式来采集数据(原理还蛮复杂的,本文不不做讨论),这样既解决了数据在统计周期间切换而带来的跳变问题(通过时间窗口),也控制了切换了力度(通过桶大小)。

关于Metrics指标收集,就不得不再次请上第一篇文章已贴出的这张执行原理图了:

它从各个地方(包括正常逻辑执行、线程池/信号量资源监察)收集指标信息,然后提供给断路器使用,或者提供给监控大盘们使用(它们均是consumer)


HystrixRollingNumber

该类用来统计一段时间内的计数,也被称作Hystrix里用于qps计数的数据结构,采用滑动窗口 分桶的形式收集。

事件类型HystrixRollingNumberEvent:可以在HystrixRollingNumber中捕获的各种状态/事件。

代码语言:javascript复制
public enum HystrixRollingNumberEvent {
    SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1),
    FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), FALLBACK_MISSING(1), EXCEPTION_THROWN(1), COMMAND_MAX_ACTIVE(2), EMIT(1), FALLBACK_EMIT(1),
    THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1),
    COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1);

    private final int type;
    private HystrixRollingNumberEvent(int type) {
        this.type = type;
    }

	// 可执行HystrixRollingNumber#increment/add/getRollingSum方法
    public boolean isCounter() { return type == 1; }
    // 可执行HystrixRollingNumber#updateRollingMax/getRollingMaxValue方法
    public boolean isMaxUpdater() { return type == 2; }

	// HystrixEventType转为HystrixRollingNumberEvent 
	public static HystrixRollingNumberEvent from(HystrixEventType eventType) {
		...
	}
}

可以看到,每一个HystrixEventType类型都能匹配到一个HystrixRollingNumberEvent从而被收集进来。

HystrixRollingNumber它是一个工具类,位于Util包:com.netflix.hystrix.util滑动窗口 分桶逻辑实现复杂,但它作为一个工具类给提供了非常实用的获取数据的方法:

代码语言:javascript复制
HystrixRollingNumber:
	
	final int numberOfBuckets;
	...
	// 环形桶:因为时间窗口需要滑动
	final BucketCircularArray buckets;
	...
    public void increment(HystrixRollingNumberEvent type) {
        getCurrentBucket().getAdder(type).increment();
    }
    ...
    // 获取自JVM启动以来所有桶的累积和
    public long getCumulativeSum(HystrixRollingNumberEvent type) { ... }
    // 获取给定指定event类型的**滚动计数器**中所有桶的总和(常用)
    public long getRollingSum(HystrixRollingNumberEvent type) { ... }
    // 获取滚动过程中,最新的桶的值
    public long getValueOfLatestBucket(HystrixRollingNumberEvent type) { ... }
    // 获取滚动中,所有桶的值们
    public long[] getValues(HystrixRollingNumberEvent type) { ... }
    // 基于getValues的基础上排序,然后取出最大值
    public long getRollingMaxValue(HystrixRollingNumberEvent type) { ... }
	...

HystrixRollingNumber统计一定时间内的统计数值,基本思想就是分段/分桶统计,比如说要统计qps,即1秒内的请求总数。如下图所示,我们可以将1s的时间分成10段,每段100ms。在第一个100ms内,写入第一个段中进行计数,在第二个100ms内,写入第二个段中进行计数,这样如果要统计当前时间的qps,我们总是可以通过统计当前时间前1s(共10段)的计数总和值。

说明:注意它和RollingDistributionStream的区别哦~


Metrics如何统计

Metrics在统计各种状态时,时运用滑动窗口思想进行统计的,在一个滑动窗口时间中又划分了若干个Bucket(滑动窗口时间与Bucket成整数倍关系),滑动窗口的移动是以Bucket为单位进行滑动的。

如:HealthCounts 记录的是一个Buckets的监控状态,Buckets为一个滑动窗口的一小部分,如果一个滑动窗口时间为 t ,Bucket数量为 n,那么每隔t/n秒将新建一个HealthCounts对象。


Metrics收集步骤

根据前面几篇文章的表述,这里简单总结指标信息的收集步骤:

  1. 命令在开始执行前会向开始消息流(HystrixCommandStartStream)发送开始消息(HystrixCommandExecutionStarted)
  2. 如果是线程池执行,执行前会向线程池开始消息流(HystrixThreadPoolStartStream)发送开始消息(HystrixCommandExecutionStarted)
  3. 如果是线程池执行,执行后会向线程池结束消息流(HystrixThreadPoolCompletionStream)发送完成消息(HystrixCommandCompletion)
  4. 命令在结束执行前会向完成消息流(HystrixCommandCompletionStream)发送完成消息(HystrixCommandCompletion)
  5. 不同类型的统计流(比如滑动窗口统计、累计统计、最大并发统计等等),会监听开始消息流或完成消息流,根据接受到的消息内容,进行统计

HystrixMetrics

指标数据采集的基类。当前服务的健康状况, 包括服务调用总次数和服务调用失败次数等. 根据Metrics的计数, 熔断器从而能计算出当前服务的调用失败率, 用来和设定的阈值比较从而决定熔断器的状态切换逻辑. 因此Metrics的实现非常重要

代码语言:javascript复制
public abstract class HystrixMetrics {
	protected final HystrixRollingNumber counter;
    protected HystrixMetrics(HystrixRollingNumber counter) {
        this.counter = counter;
    }
	
	// 获取累计总数
    public long getCumulativeCount(HystrixRollingNumberEvent event) {
        return counter.getCumulativeSum(event);
    }
    // 获取当前滑动窗口内的总数
    public long getRollingCount(HystrixRollingNumberEvent event) {
        return counter.getRollingSum(event);
    }
}

Hystrix的Metrics功能模块中存储了与Hystrix运行相关的度量信息,主要有三类类型:


HystrixCommandMetrics

保存hystrix命令执行的度量信息。

代码语言:javascript复制
public class HystrixCommandMetrics extends HystrixMetrics {

	private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
	// 静态Map,key为HystrixCommandKey,缓存
	private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<>();
	
	// 这两个public的函数,在介绍前面介绍HealthCountsStream已讲过,略
	public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = ...
	public static final Func2<long[], long[], long[]> bucketAggregator = ...
}

此处解释一下,为何有metrics这个static变量:由于每次请求都需要新创建command对象,而每创建一次command对象都有好多属性需要初始化(具体参见讲解AbstractCommand文章),那么是不是非常的耗时呢???

其实构造函数中的很多初始化工作只会集中在创建第一个Command时来做,后续创建的Command对象主要是从静态Map中取对应的实例来赋值,比如监控器、断路器和线程池的初始化,因为相同的Command的command key和线程池key都是一致的,在HystrixCommandMetricsHystrixCircuitBreaker.FactoryHystrixThreadPool中均有类似于metrics这样的static Map缓存用于提高效率的。

HystrixMetrics负责收集指标数据,它会借用多个Stream来进行多维护收集,所以它有众多成员属性:

代码语言:javascript复制
HystrixCommandMetrics:

    private final HystrixCommandProperties properties;
    private final HystrixCommandKey key;
    private final HystrixCommandGroupKey group;
    private final HystrixThreadPoolKey threadPoolKey;
    // 记录当前正在执行的总命令数
    // 命令start执行的时候 1,执行结束-1
    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();

 	// 使用各种纬度的Stream,进行监听信息流
    private HealthCountsStream healthCountsStream;
    private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
    private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
    private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
    private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
    private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

	... // 私有化构造器:给所有的属性赋值
	... // 省略get方法


	...

	// 当前**正在执行**的总数:HystrixCommand#run()
    public int getCurrentConcurrentExecutionCount() {
        return concurrentExecutionCount.get();
    }
    ...

	// 检索总请求、错误计数和错误百分比的快照。
    public HealthCounts getHealthCounts() {
        return healthCountsStream.getLatest();
    }
	...

	// 因为收集指标信息都是异步收集的,这个方法可以解除所有的订阅
    private void unsubscribeAll() {
        healthCountsStream.unsubscribe();
        rollingCommandEventCounterStream.unsubscribe();
        cumulativeCommandEventCounterStream.unsubscribe();
        rollingCommandLatencyDistributionStream.unsubscribe();
        rollingCommandUserLatencyDistributionStream.unsubscribe();
        rollingCommandMaxConcurrencyStream.unsubscribe();
    }

它对外提供非常多的方法/能力,其中绝大多数都是委托给各种xxxStream来完成,下面对主要方法做如下描述:

  • markCommandStart:当命令开始执行,调用该方法
  • markCommandDone:命令执行完成,调用该方法
    • 说明:以上2方法均不是public方法,由AbstractCommand调用。其中有个API:HystrixThreadEventStream后续会有详细介绍
  • getRollingCount:获取某一事件类型窗口期内的统计数值(委托rollingCommandEventCounterStream
  • getCumulativeCount:获取某一事件类型持续的统计数值
  • getExecutionTimePercentile:获取某一百分比的请求执行时间(委托rollingCommandLatencyDistributionStream
  • getExecutionTimeMean:获取平均请求执行时间(委托rollingCommandLatencyDistributionStream
  • getTotalTimePercentile:获取某一百分比的请求执行总时间(委托rollingCommandUserLatencyDistributionStream
  • getTotalTimeMean:获取平均请求执行总时间
  • getRollingMaxConcurrentExecutions:获取上一个窗口期内最大的并发数
  • getHealthCountsStream:获取窗口期内的失败次数,总次数,失败比率

另外,构建一个HystrixCommandMetrics的实例,依旧以static静态方法对外提供,加缓存来提高效率:

代码语言:javascript复制
HystrixCommandMetrics:

	// 根据参数,得到一个HystrixCommandMetrics 实例
	// 如果缓存里已经有了,就直接返回
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties) {
        return getInstance(key, commandGroup, null, properties);
    }
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
    	
    	// 双重校验锁
        HystrixCommandMetrics commandMetrics = metrics.get(key.name());
        if (commandMetrics != null) 
            return commandMetrics; 
		// 线程安全
		synchronized (HystrixCommandMetrics.class) {
			HystrixCommandMetrics existingMetrics = metrics.get(key.name());
                if (existingMetrics != null)
                    return existingMetrics;
			
				// 创建一个新的实例
				// =======线程的key默认情况下就是groupKey======
                HystrixThreadPoolKey nonNullThreadPoolKey;
                if (threadPoolKey == null) {
                    nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
                } else {
                    nonNullThreadPoolKey = threadPoolKey;
                }

				// 使用构造器初始化一个实例,并且放进Map里缓存起来
                HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
                metrics.putIfAbsent(key.name(), newCommandMetrics);
                return newCommandMetrics;

		}
    }

	// 这个getInstance只查找缓存,若缓存中木有,就返回null
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key) {
        return metrics.get(key.name());
    }

HystrixThreadPoolMetrics

原理同上,只是管理的Stream流不一样而已:

代码语言:javascript复制
HystrixThreadPoolMetrics:

    private final HystrixThreadPoolKey threadPoolKey;
    private final ThreadPoolExecutor threadPool;
    private final HystrixThreadPoolProperties properties;


    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
    private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;

主要方法:

  • markThreadExecution:当线程执行时,调用此方法,次数 1
  • markThreadCompletion:执行完,次数-1
  • markThreadRejection:command任务被线程池拒绝时,次数-1
  • getRollingCount:和上面不一样,这里委托的是RollingThreadPoolEventCounterStream
  • getCumulativeCount

获取实例的方法同上。


HystrixCollapserMetrics
代码语言:javascript复制
HystrixCollapserMetrics:

    private final HystrixCollapserKey collapserKey;
    private final HystrixCollapserProperties properties;

    private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;

略。


使用示例

在例子之前,需要提醒的是:以上方法虽然最终是委托给Stream去执行的,但是它们并不会有延迟,是立即的(因为Stream流一般都会有窗口,比如1s一次,500ms一次等等),但是,但是,但是它是getLatest哦,也就是拿最新的数据,官方解释为:

代码语言:javascript复制
// 同步调用以检索上次计算的bucket而不用等待
// Synchronous call to retrieve the last calculated bucket without waiting for any emissions
public Output getLatest() { ... }

示例代码:

代码语言:javascript复制
@Test
public void fun1() throws InterruptedException {
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
    HystrixCommandGroupKey commandGroupKey = HystrixCommandGroupKey.Factory.asKey("MyAppGroup");
    HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("MyAppGroup");
    HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());

    // command指标信息
    HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(commandKey, commandGroupKey, threadPoolKey, properties);

    // 发送事件(发送多次)
    CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.execute();
    helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.queue();
    // 走fallabck
    helloWorld = new CommandHelloWorld(null);
    helloWorld.queue();


    // 打印指标信息
    TimeUnit.SECONDS.sleep(1); // 需要留给指标收集的时间
    System.out.println("===========commandMetrics信息===========");
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS));

    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FALLBACK_SUCCESS));


    System.out.println(commandMetrics.getHealthCounts());
    System.out.println(commandMetrics.getExecutionTimeMean());
}

运行程序控制台打印:

代码语言:javascript复制
===========commandMetrics信息===========
0
0
0
0
0
0
HealthCounts[1 / 3 : 33%]
0

说明:至于为何很多是0,这个和getLatest以及本地测试不好控有关,暂可忽略。


总结

关于Netflix Hystrix指标数据收集器:HystrixMetrics就介绍到这了,你可能会觉得它和前面讲的xxxStream有非常多的功能相似之处。从源码能看出,HystrixMetrics似乎是对xxxStream的一些包装,内部事件最终都是委托给xxxStream去完成了的。

只不过最大的区别是:HystrixMetrics所有的获取指标信息的方法,获取的都是瞬时的(最新的)值,而并不需要等待,这是和流式统计计算最大的区别。

0 人点赞