在Hystrix熔断和限流(一)中,我们介绍了Hystrix API的底层使用方式, 今天就深入源码,看下是如何实现熔断和限流的.
一. 响应式编程
与命令式编程不同, Hystrix中采用的是响应式编程.
响应式编程是一种通过异步和数据流来构建事务关系的编程模型, 它的思想是构建关系, 而不是具体去执行.
而这种编程方式是使用责任链模式和观察者模式配合实现的.
个人认为响应式编程在设计上有些反人类, 排查问题和理解代码都很麻烦.
1.1
责任链模式
责任链模式是一个请求从链式的首端发出时, 会沿着链的路径依次传递给每一个节点对象, 直至有对象处理这个请求为止.
用到最多的就是Servlet中的Filter了.
1.2
观察者模式
当一个对象修改时, 会自动通知依赖它的对象.
JDK中工具类:java.util.Observable
二. rxjava.jar中的Observable
Hystrix是基于rxjava.jar中的Observable类实现的, 这节先一起熟悉下Observable.
如果您对rxjava已经很熟了, 可以跳过本节.
2.1
创建Observable
创建Observable的创建过程是通过静态方法create()创建实例对象;
参数是OnSubscribe的匿名类, 其中call()用来处理业务逻辑;
同时又可以利用call()的方法参数Subscriber回调订阅逻辑.
代码语言:javascript复制final List<String> list = Arrays.asList(new String[]{"one", "two", "three"});
Observable observable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String str : list) {
subscriber.onNext(str);
}
subscriber.onCompleted();
}
});
2.2
订阅者执行回调任务
subscribe()不仅可以添加回调逻辑, 还会触发Observable中call()的逻辑处理.
如果不传入自定义Observer(或Subscriber, 两者都支持, 底层处理也是类似的), 系统会有一个默认处理类.
代码语言:javascript复制observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onNext(String s) {
System.out.println(s);
}
});
执行结果:
代码语言:javascript复制one
two
three
onCompleted
2.3
Subscriber数据转换
lift()是将当前的Subscriber数据转换为另外一种格式, 并以责任链的方式继续回调其他Subscriber
示例中将原本是Integer型数据, 通过lift()方法转换成String类型数据, 最后输出结果.
代码语言:javascript复制Observable.range(10, 3).lift(new Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(Subscriber<? super String> subscriber) {
return new Subscriber<Integer>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(Integer integer) {
String s = "next:" integer;
subscriber.onNext(s);
}
};
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable arg0) { }
@Override
public void onNext(String string) {
System.out.println(string);
}
});
}
执行结果:
代码语言:javascript复制next:10
next:11
next:12
onCompleted
三. Hystrix熔断限流分析
下面一起看下, Hystrix是如何使用Observable实现熔断和限流的.
3.1
主处理逻辑Observable
在之前的Hystrix熔断和限流(一)中,我们已经了解到Hystrix的处理逻辑都会调用到queue()方法, 进而调用到toObservable()方法, 生成主要处理逻辑Observable.
toObservable()是一个长方法, 不要被吓到, 方法长是因为使用了内部类, 抽出主要逻辑来看.
一共四步:
1. 判断缓存是否启用, 从缓存中取数据, 记录数据统计情况
2. 使用Observable模式, 定义数据处理逻辑, 但不立即执行
3. Observable数据转换以及结束和异常时回调
4. 判断缓存是否启用, 将结果写入缓存
代码语言:javascript复制public Observable<R> toObservable() {
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 1. 判断缓存是否启用, 从缓存中取数据, 记录数据统计情况
if (requestCacheEnabled) {
Observable<R> fromCache = requestCache.get(getCacheKey());
// ...
}
// 2. 使用Observable模式, 定义数据处理逻辑, 但不立即执行
Observable<R> o = Observable.create(new OnSubscribe<R>() {
// ...
});
// 3. Observable数据转换以及结束和异常时回调
o = o.lift(new CommandHookApplication(this));
o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
// ...
});
o = o.doOnTerminate(new Action0() {
// ...
});
// 4. 判断缓存是否启用, 将结果写入缓存
if (requestCacheEnabled) {
Observable<R> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
// ...
return o;
} else {
return new ObservableCommand<R>(o, this);
}
}
3.2
数据处理逻辑Observable
该逻辑是数据处理的核心逻辑, 同样里面也使用了Observable, 抽丝剥茧看看它是如何处理的.
主要有四步:
如果还不清楚run()和fallback()方法的,可以先读下Hystrix熔断和限流(一).
1. 熔断器判断, 是否能执行逻辑
2. 获取限流信号量成功, 执行正常业务处理逻辑, 最终会执行run()方法, 同时还会进行数据统计;
正常处理Observable: getRunObservableDecoratedForMetricsAndErrorHandling()
3. 获取限流信号量失败, 执行失败处理逻辑, 会执行fallback(), 并进行数据统计;
失败处理Observable: getFallbackOrThrowException()
4. 熔断器判断失败, 执行失败处理流程, 会执行fallback(), 并进行数据统计;
失败处理Observable: getFallbackOrThrowException()
代码语言:javascript复制Observable<R> o = Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> observer) {
// 1. 熔断器判断, 是否能执行逻辑
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// 2. 获取信号量成功, 执行正常业务处理逻辑, 并最终执行run()方法
if (executionSemaphore.tryAcquire()) {
// ...
getRunObservableDecoratedForMetricsAndErrorHandling()
.doOnTerminate(new Action0() {
// ...
}).unsafeSubscribe(observer);
} else {
// 3. 获取信号量失败, 执行失败处理逻辑, 并最终执行fallback()
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
.unsafeSubscribe(observer);
}
} else {
// 4. 熔断器判断失败, 执行失败处理流程, 并最终执行fallback()
try {
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
.unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
}
}
});
3.3
Observable处理流程触发
前文已经介绍了Observable流程的定义, 再说下触发逻辑.
在queue()方法中, 是通过toFuture()内调用的subscribe()方法, 触发Observable逻辑的.
代码语言:javascript复制final Future<R> f = o.toBlocking().toFuture();
总结
本文主要介绍了Hystrix利用rxjava.jar中的Observable处理熔断和限流的流程.
下节我们会详细介绍熔断和限流的实现原理.