我们在使用订阅者的时候,都是用Sink或者Assign,但是大家有没有想过一个问题,这两种订阅者在第一次连接到发布者的时候,会发送一个无限大(unlimited)的需求(Demand)。
这个时候,订阅者就会一直不停的接收到发布者发过来的内容,按理说,订阅者无条件接收就可以了,但是,如果发布者发布的速度太快了,而订阅者接收的速度很慢,接收不了,那怎么办呢?又或者说,我不需要实时接收,我只需要隔一段时间接收一次,这种需求也是非常多见的。
这个时候就涉及到一个概念,背压(back pressure),或者叫回压,我们可以通过这个背压,来精确的控制发布者什么时候生成元素,我们通常理解的话,发布者应该是主动发布的,然后订阅者被动的去接收。
其实不是,而是由订阅者去连接和获取元素的时候,才进行发布,这个时候,我们就可以通过使用Subscribers.Demand这个类型来告诉发布者我可以接收多少个元素,也就是返回可以追加接收的事件数量,这样就可以做到控制发布者的发送速度,以此来定义 Backpressure 的响应行为。
Combine 在设计思路和 API 等等很多地方都参考了 ReactiveX,特别是具体到 RxSwift 的实现。如果你对响应式编程有了一定的认识的话,把你的项目从 RxSwift 迁移到 Combine 应该是非常容易的,不得不说Combine“抄袭”的非常成功。
如果非要说 RxSwift 和 Combine 的最大的不同之处,那就是 RxSwift 到现在为止都没有支持 backpressure,只有RxJava才有这个机制;但是 Combine 中原生对这个特性进行了支持。
我写了一个demo,发布者是这个定时器:
点击button的时候,就开始订阅:
这个订阅者是自定义的,他遵循Subscriber协议,然后实现协议里面的三个方法:
第一个方法里面,使用接收到的这个订阅subscription,去向发布者请求元素,这个Subscription协议就是连接发布者和订阅者的桥梁;第二个方法是用来告诉订阅者,发布者已经产生了元素。
并且可以接收到一个Date元素input,然后返回一个需求量,也就是你希望订阅者还能够接收多少个元素;第三个方法告诉订阅者,发布者已经发布完了,不管是发布正常或者是有错误,这个结果我都会告诉你。
说的再简单点,发布者会跟踪所有的订阅者,看谁的需求没有满足,就产生元素给谁,一直到满足所有的需求,发布者就不产生元素了,任务就完成了,在第一个方法里面,发布者和订阅者就都存在了。
但是需求为0,就不会产生任何元素,一直到1秒钟延时结束执行到闭包里面的request,订阅者就给了发布者一个非零的需求,现在发布者就开始发布元素,并且是每隔一秒发布一次,一共发布三个元素就会停止发布,但是也并不会执行第三个方法打印完成,因为发布者还在等待更多的需求。
所以这时候如果有需要的话,订阅者可以把这个订阅次数保存下来,然后定期去请求元素,就可以很灵活的管理这个发布过程。比如有一个非常常见的开发场景,我们可以在输入框中输入一些内容进行搜索操作,并且一旦输入框的内容改变了,我就去调用接口刷新对应的列表数据,但这个接口调用频率是一定要进行控制的,不然的话。
如果我按住一个英文字母键不放开,输入框会一直在变化,就会不停的去调用接口来刷新页面数据,就算你的代码逻辑很好,不会卡顿不会崩溃,你们的后台人员也肯定会骂你,因为平白无故增加了服务器压力,这个时候,就可以用到这个背压的方式来进行控制和处理。
而且还有更简单的方式,就是直接使用背压操作符,完全不需要自定义订阅者:
1.buffer(size:prefetch:whenFull:),保留来自上游发布者的固定数量的项目。缓冲满了之后,缓冲区会丢弃元素或抛出错误;
2.debounce(for:scheduler:options:),只在上游发布者在指定的时间间隔内停止发布时才发布;
3.throttle(for:scheduler:latest:),以给定的最大速率生成元素。如果在一个间隔内接收到多个元素,则仅发送最新的或最早的元素;
4.collect(_:) 和 collect(_:options:) 聚集元素,直到它们超过给定的数量或时间间隔,然后向订阅者发送元素数组。如果订阅者可以同时处理多个元素,这个操作符将是很好的选择。
这些操作符都可以控制订阅者接收的元素数量,所以可以放心地连接无限需求的订阅者,比如:sink(receiveValue:) 和 assign(to:on:)。
Debounce是防抖的意思,Throttle是节流,他们俩在前端开发中可能会经常用到,做iOS开发可能很多人都不知道这个概念,其实我们在工作中或多或少都遇到过需要使用背压的场景,只是大多数人接触的不多,没有具体了解到概念和原理的对应关系,就像设计模式有很多种,实际开发中我们用到了某种设计模式自己却不知道。
实际使用我就不写了,比较简单,类似这样:
另外,我们也可以通过设置 flatMap 的 maxPublishers来控制发布频率,我举个例子:
然后,在点击事件里面进行调用:
这样也同样实现了每隔一秒发布一次,但是一定要注意一个问题,如果我把publisher更换成 PassthoughSubject 或 Notification,就会出现数据遗漏的情况。因为我们限制了数据的并行处理数量,所以就导致数据的消耗时间超过了数据的生成时间。这个时候,我们就需要在 Publisher 的后面添加 buffer 来对数据进行缓冲:
最后,把Publisher转换成AsyncSequence也可以做到类似的效果,创建一个遵循AsyncSequence协议的结构体,将从 Publihser 中获取的数据通过 AsyncStream 转送出去,并将迭代器指向 AsyncStream 的迭代器即可,这里就不展开来写了,学无止境,执笔共勉。
- END -