看一个人的成功,不是看他赢了多少人,而是看他成就了多少人。
代码下载地址:https://github.com/f641385712/netflix-learning
前言
前面已经花了5篇文章专门介绍了Hystrix基于事件机制的数据收集、Stream流式处理,再回头来理解它的HystrixMetrics
指标收集就一清二楚了。
在HystrixCommand
执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard
可以统计分析这些数据,从而完成特定的功能。
Hystrix
以command命令模式的方式来控制业务逻辑以及熔断逻辑的调用时机,所以说数据统计对它来说不算难事,但如何高效、精准的在内存中统计数据,还需要一定的技巧。
需要提前说明的是:像什么hystrix.stream
、HystrixDashboard
面板查看等这些,本文均还不会体现。本文只阐述数据的采集,至于数据如何使用(存储or展示)放在后几篇文章。
正文
Hystrix
收集数据是必不可少的一步,每个降级点(需要采取降级保护的点)的数据是独立的,所以我们可以给每个降级点配置单独的策略。
这些策略一般是建立在我们对这些降级点的了解之上的,初期甚至可以先观察一下采集的数据来指定降级策略。
采集哪些数据?数据如何存储?数据如何上报?这都是Hystrix需要考虑的问题,Hystrix
采用的是滑动窗口 分桶的形式来采集数据(原理还蛮复杂的,本文不不做讨论),这样既解决了数据在统计周期间切换而带来的跳变问题(通过时间窗口),也控制了切换了力度(通过桶大小)。
关于Metrics指标收集,就不得不再次请上第一篇文章已贴出的这张执行原理图了:
它从各个地方(包括正常逻辑执行、线程池/信号量资源监察)收集指标信息,然后提供给断路器使用,或者提供给监控大盘们使用(它们均是consumer)
HystrixRollingNumber
该类用来统计一段时间内的计数,也被称作Hystrix里用于qps计数的数据结构,采用滑动窗口 分桶的形式收集。
事件类型HystrixRollingNumberEvent
:可以在HystrixRollingNumber
中捕获的各种状态/事件。
public enum HystrixRollingNumberEvent {
SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1),
FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), FALLBACK_MISSING(1), EXCEPTION_THROWN(1), COMMAND_MAX_ACTIVE(2), EMIT(1), FALLBACK_EMIT(1),
THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1),
COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1);
private final int type;
private HystrixRollingNumberEvent(int type) {
this.type = type;
}
// 可执行HystrixRollingNumber#increment/add/getRollingSum方法
public boolean isCounter() { return type == 1; }
// 可执行HystrixRollingNumber#updateRollingMax/getRollingMaxValue方法
public boolean isMaxUpdater() { return type == 2; }
// HystrixEventType转为HystrixRollingNumberEvent
public static HystrixRollingNumberEvent from(HystrixEventType eventType) {
...
}
}
可以看到,每一个HystrixEventType
类型都能匹配到一个HystrixRollingNumberEvent
从而被收集进来。
HystrixRollingNumber
它是一个工具类,位于Util包:com.netflix.hystrix.util
。滑动窗口 分桶逻辑实现复杂,但它作为一个工具类给提供了非常实用的获取数据的方法:
HystrixRollingNumber:
final int numberOfBuckets;
...
// 环形桶:因为时间窗口需要滑动
final BucketCircularArray buckets;
...
public void increment(HystrixRollingNumberEvent type) {
getCurrentBucket().getAdder(type).increment();
}
...
// 获取自JVM启动以来所有桶的累积和
public long getCumulativeSum(HystrixRollingNumberEvent type) { ... }
// 获取给定指定event类型的**滚动计数器**中所有桶的总和(常用)
public long getRollingSum(HystrixRollingNumberEvent type) { ... }
// 获取滚动过程中,最新的桶的值
public long getValueOfLatestBucket(HystrixRollingNumberEvent type) { ... }
// 获取滚动中,所有桶的值们
public long[] getValues(HystrixRollingNumberEvent type) { ... }
// 基于getValues的基础上排序,然后取出最大值
public long getRollingMaxValue(HystrixRollingNumberEvent type) { ... }
...
HystrixRollingNumber
统计一定时间内的统计数值,基本思想就是分段/分桶统计,比如说要统计qps,即1秒内的请求总数。如下图所示,我们可以将1s的时间分成10段,每段100ms。在第一个100ms内,写入第一个段中进行计数,在第二个100ms内,写入第二个段中进行计数,这样如果要统计当前时间的qps,我们总是可以通过统计当前时间前1s(共10段)的计数总和值。
说明:注意它和RollingDistributionStream的区别哦~
Metrics如何统计
Metrics在统计各种状态时,时运用滑动窗口思想进行统计的,在一个滑动窗口时间中又划分了若干个Bucket(滑动窗口时间与Bucket成整数倍关系),滑动窗口的移动是以Bucket为单位进行滑动的。
如:HealthCounts 记录的是一个Buckets的监控状态,Buckets为一个滑动窗口的一小部分,如果一个滑动窗口时间为 t ,Bucket数量为 n,那么每隔t/n秒将新建一个HealthCounts对象。
Metrics收集步骤
根据前面几篇文章的表述,这里简单总结指标信息的收集步骤:
- 命令在开始执行前会向开始消息流(HystrixCommandStartStream)发送开始消息(HystrixCommandExecutionStarted)
- 如果是线程池执行,执行前会向线程池开始消息流(HystrixThreadPoolStartStream)发送开始消息(HystrixCommandExecutionStarted)
- 如果是线程池执行,执行后会向线程池结束消息流(HystrixThreadPoolCompletionStream)发送完成消息(HystrixCommandCompletion)
- 命令在结束执行前会向完成消息流(HystrixCommandCompletionStream)发送完成消息(HystrixCommandCompletion)
- 不同类型的统计流(比如滑动窗口统计、累计统计、最大并发统计等等),会监听开始消息流或完成消息流,根据接受到的消息内容,进行统计
HystrixMetrics
指标数据采集的基类。当前服务的健康状况, 包括服务调用总次数和服务调用失败次数等. 根据Metrics的计数, 熔断器从而能计算出当前服务的调用失败率, 用来和设定的阈值比较从而决定熔断器的状态切换逻辑. 因此Metrics的实现非常重要。
代码语言:javascript复制public abstract class HystrixMetrics {
protected final HystrixRollingNumber counter;
protected HystrixMetrics(HystrixRollingNumber counter) {
this.counter = counter;
}
// 获取累计总数
public long getCumulativeCount(HystrixRollingNumberEvent event) {
return counter.getCumulativeSum(event);
}
// 获取当前滑动窗口内的总数
public long getRollingCount(HystrixRollingNumberEvent event) {
return counter.getRollingSum(event);
}
}
Hystrix的Metrics功能模块中存储了与Hystrix运行相关的度量信息,主要有三类类型:
HystrixCommandMetrics
保存hystrix命令执行的度量信息。
代码语言:javascript复制public class HystrixCommandMetrics extends HystrixMetrics {
private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
// 静态Map,key为HystrixCommandKey,缓存
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<>();
// 这两个public的函数,在介绍前面介绍HealthCountsStream已讲过,略
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = ...
public static final Func2<long[], long[], long[]> bucketAggregator = ...
}
此处解释一下,为何有metrics
这个static变量:由于每次请求都需要新创建command对象,而每创建一次command对象都有好多属性需要初始化(具体参见讲解AbstractCommand文章),那么是不是非常的耗时呢???
其实构造函数中的很多初始化工作只会集中在创建第一个Command时来做,后续创建的Command对象主要是从静态Map中取对应的实例来赋值,比如监控器、断路器和线程池的初始化,因为相同的Command的command key和线程池key都是一致的,在HystrixCommandMetrics
、HystrixCircuitBreaker.Factory
、HystrixThreadPool
中均有类似于metrics
这样的static Map缓存用于提高效率的。
HystrixMetrics负责收集指标数据,它会借用多个Stream
来进行多维护收集,所以它有众多成员属性:
HystrixCommandMetrics:
private final HystrixCommandProperties properties;
private final HystrixCommandKey key;
private final HystrixCommandGroupKey group;
private final HystrixThreadPoolKey threadPoolKey;
// 记录当前正在执行的总命令数
// 命令start执行的时候 1,执行结束-1
private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
// 使用各种纬度的Stream,进行监听信息流
private HealthCountsStream healthCountsStream;
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
... // 私有化构造器:给所有的属性赋值
... // 省略get方法
...
// 当前**正在执行**的总数:HystrixCommand#run()
public int getCurrentConcurrentExecutionCount() {
return concurrentExecutionCount.get();
}
...
// 检索总请求、错误计数和错误百分比的快照。
public HealthCounts getHealthCounts() {
return healthCountsStream.getLatest();
}
...
// 因为收集指标信息都是异步收集的,这个方法可以解除所有的订阅
private void unsubscribeAll() {
healthCountsStream.unsubscribe();
rollingCommandEventCounterStream.unsubscribe();
cumulativeCommandEventCounterStream.unsubscribe();
rollingCommandLatencyDistributionStream.unsubscribe();
rollingCommandUserLatencyDistributionStream.unsubscribe();
rollingCommandMaxConcurrencyStream.unsubscribe();
}
它对外提供非常多的方法/能力,其中绝大多数都是委托给各种xxxStream来完成,下面对主要方法做如下描述:
markCommandStart
:当命令开始执行,调用该方法markCommandDone
:命令执行完成,调用该方法- 说明:以上2方法均不是public方法,由
AbstractCommand
调用。其中有个API:HystrixThreadEventStream
后续会有详细介绍
- 说明:以上2方法均不是public方法,由
getRollingCount
:获取某一事件类型窗口期内的统计数值(委托rollingCommandEventCounterStream
)getCumulativeCount
:获取某一事件类型持续的统计数值getExecutionTimePercentile
:获取某一百分比的请求执行时间(委托rollingCommandLatencyDistributionStream
)getExecutionTimeMean
:获取平均请求执行时间(委托rollingCommandLatencyDistributionStream
)getTotalTimePercentile
:获取某一百分比的请求执行总时间(委托rollingCommandUserLatencyDistributionStream
)getTotalTimeMean
:获取平均请求执行总时间getRollingMaxConcurrentExecutions
:获取上一个窗口期内最大的并发数getHealthCountsStream
:获取窗口期内的失败次数,总次数,失败比率
另外,构建一个HystrixCommandMetrics
的实例,依旧以static静态方法对外提供,加缓存来提高效率:
HystrixCommandMetrics:
// 根据参数,得到一个HystrixCommandMetrics 实例
// 如果缓存里已经有了,就直接返回
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties) {
return getInstance(key, commandGroup, null, properties);
}
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
// 双重校验锁
HystrixCommandMetrics commandMetrics = metrics.get(key.name());
if (commandMetrics != null)
return commandMetrics;
// 线程安全
synchronized (HystrixCommandMetrics.class) {
HystrixCommandMetrics existingMetrics = metrics.get(key.name());
if (existingMetrics != null)
return existingMetrics;
// 创建一个新的实例
// =======线程的key默认情况下就是groupKey======
HystrixThreadPoolKey nonNullThreadPoolKey;
if (threadPoolKey == null) {
nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
} else {
nonNullThreadPoolKey = threadPoolKey;
}
// 使用构造器初始化一个实例,并且放进Map里缓存起来
HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
metrics.putIfAbsent(key.name(), newCommandMetrics);
return newCommandMetrics;
}
}
// 这个getInstance只查找缓存,若缓存中木有,就返回null
public static HystrixCommandMetrics getInstance(HystrixCommandKey key) {
return metrics.get(key.name());
}
HystrixThreadPoolMetrics
原理同上,只是管理的Stream流不一样而已:
代码语言:javascript复制HystrixThreadPoolMetrics:
private final HystrixThreadPoolKey threadPoolKey;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolProperties properties;
private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
主要方法:
markThreadExecution
:当线程执行时,调用此方法,次数 1markThreadCompletion
:执行完,次数-1markThreadRejection
:command任务被线程池拒绝时,次数-1getRollingCount
:和上面不一样,这里委托的是RollingThreadPoolEventCounterStream
getCumulativeCount
:
获取实例的方法同上。
HystrixCollapserMetrics
代码语言:javascript复制HystrixCollapserMetrics:
private final HystrixCollapserKey collapserKey;
private final HystrixCollapserProperties properties;
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;
略。
使用示例
在例子之前,需要提醒的是:以上方法虽然最终是委托给Stream去执行的,但是它们并不会有延迟,是立即的(因为Stream流一般都会有窗口,比如1s一次,500ms一次等等),但是,但是,但是它是getLatest哦,也就是拿最新的数据,官方解释为:
代码语言:javascript复制// 同步调用以检索上次计算的bucket而不用等待
// Synchronous call to retrieve the last calculated bucket without waiting for any emissions
public Output getLatest() { ... }
示例代码:
代码语言:javascript复制@Test
public void fun1() throws InterruptedException {
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
HystrixCommandGroupKey commandGroupKey = HystrixCommandGroupKey.Factory.asKey("MyAppGroup");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("MyAppGroup");
HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
// command指标信息
HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(commandKey, commandGroupKey, threadPoolKey, properties);
// 发送事件(发送多次)
CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.execute();
helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.queue();
// 走fallabck
helloWorld = new CommandHelloWorld(null);
helloWorld.queue();
// 打印指标信息
TimeUnit.SECONDS.sleep(1); // 需要留给指标收集的时间
System.out.println("===========commandMetrics信息===========");
System.out.println(commandMetrics.getRollingCount(HystrixEventType.SUCCESS));
System.out.println(commandMetrics.getRollingCount(HystrixEventType.FAILURE));
System.out.println(commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.SUCCESS));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FAILURE));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FALLBACK_SUCCESS));
System.out.println(commandMetrics.getHealthCounts());
System.out.println(commandMetrics.getExecutionTimeMean());
}
运行程序控制台打印:
代码语言:javascript复制===========commandMetrics信息===========
0
0
0
0
0
0
HealthCounts[1 / 3 : 33%]
0
说明:至于为何很多是0,这个和getLatest以及本地测试不好控有关,暂可忽略。
总结
关于Netflix Hystrix指标数据收集器:HystrixMetrics
就介绍到这了,你可能会觉得它和前面讲的xxxStream有非常多的功能相似之处。从源码能看出,HystrixMetrics
似乎是对xxxStream
的一些包装,内部事件最终都是委托给xxxStream
去完成了的。
只不过最大的区别是:HystrixMetrics
所有的获取指标信息的方法,获取的都是瞬时的(最新的)值,而并不需要等待,这是和流式统计计算最大的区别。