Hystrix熔断和限流源码分析(二)

2022-06-20 20:12:25 浏览数 (1)

Hystrix熔断和限流(一)中,我们介绍了Hystrix API的底层使用方式, 今天就深入源码,看下是如何实现熔断和限流的.

一. 响应式编程

与命令式编程不同, Hystrix中采用的是响应式编程.

响应式编程是一种通过异步和数据流来构建事务关系的编程模型, 它的思想是构建关系, 而不是具体去执行.

而这种编程方式是使用责任链模式和观察者模式配合实现的.

个人认为响应式编程在设计上有些反人类, 排查问题和理解代码都很麻烦.

1.1

责任链模式

责任链模式是一个请求从链式的首端发出时, 会沿着链的路径依次传递给每一个节点对象, 直至有对象处理这个请求为止.

用到最多的就是Servlet中的Filter了.

1.2

观察者模式

当一个对象修改时, 会自动通知依赖它的对象.

JDK中工具类:java.util.Observable

二. rxjava.jar中的Observable

Hystrix是基于rxjava.jar中的Observable类实现的, 这节先一起熟悉下Observable.

如果您对rxjava已经很熟了, 可以跳过本节.

2.1

创建Observable

创建Observable的创建过程是通过静态方法create()创建实例对象;

参数是OnSubscribe的匿名类, 其中call()用来处理业务逻辑;

同时又可以利用call()的方法参数Subscriber回调订阅逻辑.

代码语言:javascript复制
final List<String> list = Arrays.asList(new String[]{"one", "two", "three"});
Observable observable = Observable.create(new OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        for (String str : list) {
            subscriber.onNext(str);
        }
        subscriber.onCompleted();
    }
});

2.2

订阅者执行回调任务

subscribe()不仅可以添加回调逻辑, 还会触发Observable中call()的逻辑处理.

如果不传入自定义Observer(或Subscriber, 两者都支持, 底层处理也是类似的), 系统会有一个默认处理类.

代码语言:javascript复制
observable.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
    @Override
    public void onError(Throwable throwable) { }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

执行结果:

代码语言:javascript复制
one
two
three
onCompleted

2.3

Subscriber数据转换

lift()是将当前的Subscriber数据转换为另外一种格式, 并以责任链的方式继续回调其他Subscriber

示例中将原本是Integer型数据, 通过lift()方法转换成String类型数据, 最后输出结果.

代码语言:javascript复制
Observable.range(10, 3).lift(new Operator<String, Integer>() {
      @Override
      public Subscriber<? super Integer> call(Subscriber<? super String> subscriber) {
          return new Subscriber<Integer>() {
              @Override
              public void onCompleted() {
                  subscriber.onCompleted();
              }
              @Override
              public void onError(Throwable e) {
                  subscriber.onError(e);
              }
              @Override
              public void onNext(Integer integer) {
                  String s = "next:"   integer;
                  subscriber.onNext(s);
              }
          };
      }
  })
  .subscribe(new Observer<String>() {
      @Override
      public void onCompleted() {
          System.out.println("onCompleted");
      }
      @Override
      public void onError(Throwable arg0) { }
      @Override
      public void onNext(String string) {
          System.out.println(string);
      }
  });
}

执行结果:

代码语言:javascript复制
next:10
next:11
next:12
onCompleted

三. Hystrix熔断限流分析

下面一起看下, Hystrix是如何使用Observable实现熔断和限流的.

3.1

主处理逻辑Observable

在之前的Hystrix熔断和限流(一)中,我们已经了解到Hystrix的处理逻辑都会调用到queue()方法, 进而调用到toObservable()方法, 生成主要处理逻辑Observable.

toObservable()是一个长方法, 不要被吓到, 方法长是因为使用了内部类, 抽出主要逻辑来看.

一共四步:

1. 判断缓存是否启用, 从缓存中取数据, 记录数据统计情况

2. 使用Observable模式, 定义数据处理逻辑, 但不立即执行

3. Observable数据转换以及结束和异常时回调

4. 判断缓存是否启用, 将结果写入缓存

代码语言:javascript复制
public Observable<R> toObservable() {
    final boolean requestCacheEnabled = isRequestCachingEnabled();
    // 1. 判断缓存是否启用, 从缓存中取数据, 记录数据统计情况
    if (requestCacheEnabled) {
        Observable<R> fromCache = requestCache.get(getCacheKey());
        // ...
    }
    // 2. 使用Observable模式, 定义数据处理逻辑, 但不立即执行
    Observable<R> o = Observable.create(new OnSubscribe<R>() {
      // ...
    });
    // 3. Observable数据转换以及结束和异常时回调
    o = o.lift(new CommandHookApplication(this));
    o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
    // ...
    });
    o = o.doOnTerminate(new Action0() {
    // ...
    });
    // 4. 判断缓存是否启用, 将结果写入缓存
    if (requestCacheEnabled) {
        Observable<R> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
        // ...
        return o;
    } else {
        return new ObservableCommand<R>(o, this);
    }
}

3.2

数据处理逻辑Observable

该逻辑是数据处理的核心逻辑, 同样里面也使用了Observable, 抽丝剥茧看看它是如何处理的.

主要有四步:

如果还不清楚run()和fallback()方法的,可以先读下Hystrix熔断和限流(一).

1. 熔断器判断, 是否能执行逻辑

2. 获取限流信号量成功, 执行正常业务处理逻辑, 最终会执行run()方法, 同时还会进行数据统计;

正常处理Observable: getRunObservableDecoratedForMetricsAndErrorHandling()

3. 获取限流信号量失败, 执行失败处理逻辑, 会执行fallback(), 并进行数据统计;

失败处理Observable: getFallbackOrThrowException()

4. 熔断器判断失败, 执行失败处理流程, 会执行fallback(), 并进行数据统计;

失败处理Observable: getFallbackOrThrowException()

代码语言:javascript复制
Observable<R> o = Observable.create(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> observer) {
        // 1. 熔断器判断, 是否能执行逻辑
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            // 2. 获取信号量成功, 执行正常业务处理逻辑, 并最终执行run()方法
            if (executionSemaphore.tryAcquire()) {
                // ...
                    getRunObservableDecoratedForMetricsAndErrorHandling()
                            .doOnTerminate(new Action0() {
                            // ...
                            }).unsafeSubscribe(observer);
            } else {
            // 3. 获取信号量失败, 执行失败处理逻辑, 并最终执行fallback()
                getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
                        "could not acquire a semaphore for execution", semaphoreRejectionException)
                        .lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
                        .unsafeSubscribe(observer);
            }
        } else {
            // 4. 熔断器判断失败, 执行失败处理流程, 并最终执行fallback()
            try {
                getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                        "short-circuited", shortCircuitException)
                        .lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
                        .unsafeSubscribe(observer);
            } catch (Exception e) {
                observer.onError(e);
            }
        }
    }
});

3.3

Observable处理流程触发

前文已经介绍了Observable流程的定义, 再说下触发逻辑.

在queue()方法中, 是通过toFuture()内调用的subscribe()方法, 触发Observable逻辑的.

代码语言:javascript复制
final Future<R> f = o.toBlocking().toFuture();

总结

本文主要介绍了Hystrix利用rxjava.jar中的Observable处理熔断和限流的流程.

下节我们会详细介绍熔断和限流的实现原理.

0 人点赞