响应式编程最重要的是解决生产者和消费者之间的关系。如果生产者产生的数据过大,而消费者消费不过来,就会压垮消费者。所以就需要有一个重要的概念——流控。
解决流控有几种方式
- 节流 若消费者无法消费生产者生产的元素,则直接丢弃。
- 使用缓冲区 缓冲区的作用相当于在生产者和消费者之间添加了保存并转发的一种机制,把生产者发出的数据暂时存储起来供消费者慢慢消费。
- 调用栈阻塞 就是同步线程。就是消费者没消费完生产者前面的数据,后面的数据则一直阻塞。
- 使用背压 消费者需要多少,生产者生产多少。
背压机制
如果生产者发出的数据比消费者能够处理数据的最大量还要多,消费者可能会被迫一直在获取和处理数据,消耗越来越多的资源,从而埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者降低数据的生成速度。生产者可以采用多种策略来实现这一要求,这就是背压。
背压机制应该以非阻塞的方式工作。实现非阻塞背压的方法是放弃推策略而采用拉策略。
响应式流
响应式流规范是提供非阻塞背压的异步流处理标准的一种倡议。
响应式流接口
代码语言:javascript复制public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
发布者(Publisher)是潜在的包含无限数量的有序元素的生产者,它根据收到的请求向当前订阅者发送元素。
代码语言:javascript复制public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
订阅者(Subscriber)从发布者那里订阅并接收元素。发布者向订阅者发送订阅令牌(Subscription Token)。使用订阅令牌,订阅者向发布者请求多个元素。当元素准备就绪时,发布者就会向订阅者发送合适数量的元素。
当执行发布者的subscribe()方法时,发布者会回调订阅者的onSubscribe()方法。在这个方法中,订阅者通常会借助传入的Subscription对象向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext()方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete()方法告知订阅者流已经发完;如果有错误发生,则通过onError()方法发出错误数据,这同样也会终止数据流。
代码语言:javascript复制public interface Subscription {
public void request(long n);
public void cancel();
}
订阅(Subscription)表示订阅者订阅的一个发布者的令牌。当订阅请求成功时,发布者将其传递给订阅者。订阅者使用订阅令牌与发布者进行交互。例如,请求更多的元素或取消订阅。
当发布者调用subscribe()方法注册订阅者时,会通过订阅者的回调方法onSubscribe()传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request()方法向发布者请求数据。背压机制的实现正是基于这一点。