basedrop:Rust 生态中,适用于实时音频的垃圾收集器

2022-09-01 15:36:14 浏览数 (1)

首先对关心笔者的朋友表示感谢。上周是因事外出,所以未有更新。

因个人开发需要音频处理,笔者在搜索相关工具时,发现了一个很新的实时音频 crate:basedrop,目前 github 星星数 20 左右。在对 basedrop 浅显实践后,感觉此 crate 非常棒,因此分享。

示例代码为笔者实践而仓促编写,参考了 docs.rs 站点的 basedrop 文档;后来发现作者有介绍的文章,所以文字内容摘译自 glowcoil 于 2021-04-26 发表的文章 basedrop: a garbage collector for real-time audio in rust 。

在实时音频中,截止时间至关重要。您的代码仅有几毫秒的时间来填充一个缓冲区,其中的样本将被发送到 DAC(译注:数字模拟转换器,Digital to analog converter,是将输入的数字信号转换成具有模拟位准的信号)。但是,这几毫秒,也可能要与许多其他音频处理程序共享。如果您的代码花费太长时间来生成这些样本,那么就没有第二次机会;音频根本不会被播放,用户会听到一个令人讨厌的小故障,或者被口吃的声音代替。

为了防止这种情况,实时音频代码必须避免执行任何操作,这些操作可能会在无限或不可预测的时间内阻塞音频线程。这些操作包括:文件和网络 I/O、内存分配和释放,以及使用锁与非音频线程同步等,因为这些操作的“实时安全”性不被认可。相反,像 I/O 和内存分配这样的操作应该在其它线程上执行。而线程的同步操作,应该使用对音频线程没有等待的原语来执行。Ross Bencina 的经典博客文章《时间不等人(Time Waits for Nothing)》中,更全面地概述了这一主题。

考虑到音频软件通常需要分配内存,并从音频线程中使用内存。那么问题就来了:如何在受上述限制的情况下,以可管理和高效的方式完成这一任务?Basedrop 是我试图为这个问题提供的一个解决方案。

延迟回收

考虑一个简单的场景:我们有一个存储在 Vec<f32> 中的样本缓冲区,可能是从磁盘合成或加载的,我们希望在音频线程使用它。作为解决方案的初始草图,我们可以使用无等待(wait-free)且有界容量(bounded-capacity)的 SPSC 通道(译注:高性能无锁队列,比如 rtrb crate),以将缓冲区发送到音频线程。然后,当我们使用完它并希望回收内存时,我们可以通过另一个 SPSC 通道将其发送回非实时线程,以进行释放。

在较简单的情况下,此解决方案效果良好。但是,随着应用程序复杂性的增加,它也有缺点。例如,如果在音频线程之间传输大量分配,则用于返回分配的固定容量通道,则可能会被填满。由于在这种情况下阻止音频线程是不可接受的,因此应用程序需要确保信道的轮询频率足以适配,并且信道总是可满足最坏情况时需要的容量(使用更复杂的动态分配设计)。或者,如果当前无法返回分配,音频线程可以继续播放,但不出错。此外,这个解决方案依赖于程序员的纪律性,以确保分配总是被释放。而 Rust 的 RAII(译注:资源获取即初始化,详细请参阅 Rust 所有权的内存与分配)设计,对于这方面的错误,在很大程度上是不可见的。assert_no_alloc 等诊断工具 crate,可以在很大程度上检测此类错误,在编译时有一个保证是很好的。

Basedrop 的解决方案是使用 MPSC 链表队列,替换用于返回分配的固定容量的环形缓冲区。在分配时,为任何要与音频线程共享的内存块创建 MPSC 链表队列节点,并内联存储。当音频线程准备释放一段内存以进行回收时,可以通过无分配、无等待的操作将相应的节点推送到队列中。此模式由一对智能指针封装:Owned<T>Shared<T>,类似于 Box<T>Arc<T>,它们将内容推送到队列中,进行延迟回收,而不是直接丢弃。然后可以使用 basedrop 的 Collector 类型,在另一个线程上定期处理队列。

此系统的优点是回收通道不可能变满,缺少完全打开的 OOM(译注:OutOfMemory)。也不可能忘记将要收集的东西返回,只要它最初是用 Owned<T> 或者 Shared<T> 封装的。特别是 Shared<T>,其为在音频和非音频线程之间共享不可变和持久的数据结构,提供了令人兴奋的可能性。这种方式对于手动的消息传递方法来说,是很麻烦或不可能的。

Collector 的使用

丢弃队列中的所有垃圾

代码语言:javascript复制
use basedrop::{Collector, Handle, Owned};
use core::mem::drop;

let mut collector = Collector::new();
let handle = collector.handle();
let x = Owned::new(&handle, 1);
let y = Owned::new(&handle, 2);
let z = Owned::new(&handle, 3);

assert_eq!(collector.alloc_count(), 3);

drop(x);
drop(y);
drop(z);
collector.collect();

assert_eq!(collector.alloc_count(), 0);

尝试删除队列中的第一个分配

如果成功,返回 true;否则,返回 false

代码语言:javascript复制
use basedrop::{Collector, Handle, Owned};
use core::mem::drop;

let mut collector = Collector::new();
let handle = collector.handle();
let x = Owned::new(&handle, 1);
let y = Owned::new(&handle, 2);
let z = Owned::new(&handle, 3);

assert_eq!(collector.alloc_count(), 3);

drop(x);
drop(y);
drop(z);

assert!(collector.collect_one());
assert!(collector.collect_one());
assert!(collector.collect_one());

assert!(!collector.collect_one());
assert_eq!(collector.alloc_count(), 0);

SharedCell

Basedrop 还提供了另一个与音频线程共享内存的原语,称为 SharedCell<T>SharedCell<T> 充当一个线程安全的可变内存位置,用于存储 Shared<T> 指针,提供 getsetreplace 方法(与 Cell 非常类似),用来获取和更新内容。我的设想,这将被用作一种非实时线程,以原子方式发布数据的方法。然后,实时音频线程可以不可变地观察到这些数据。

以无锁方式实现此模式,其主要困难在于获取引用计数指针的副本。实际上包括两个步骤:首先,获取实际指针;然后,增加引用计数。在这两个步骤之间,决不能允许写入器用新值替换指针,将前一个值的引用计数减为零,然后释放其引用,因为这将导致读取器在释放后使用。对于这个问题有各种可能的解决方案,有不同的权衡。

SharedCell<T> 采用的方法是在存储的指针旁边,保留一个读取器计数。读取器在获取指针时,递增此计数,只有在成功递增指针的引用计数后,才能递减。反过来,在替换存储的指针之后,写入程序会循环,直到观察到计数为零,然后才允许它们移动(Rust 中的 move),并可能减少引用计数。此方案可被设计成低成本、无阻塞的读取器,而写入器的开销要高一些。我认为这是实时音频的适当折衷,读取器(音频线程)的延迟期限要短得多,执行频率也要比写入器高得多。

SharedCell 的使用

替换包含的 Shared<T>,减少进程中的引用计数

代码语言:javascript复制
use basedrop::{Collector, Shared, SharedCell};

let collector = Collector::new();
let x = Shared::new(&collector.handle(), 3);
let cell = SharedCell::new(x);

let y = Shared::new(&collector.handle(), 4);
cell.set(y);

替换包含的 Shared<T> 并返回

代码语言:javascript复制
use basedrop::{Collector, Shared, SharedCell};

let collector = Collector::new();
let x = Shared::new(&collector.handle(), 3);
let cell = SharedCell::new(x);

let y = Shared::new(&collector.handle(), 4);
let x = cell.replace(y);

释放 SharedCell,并返回包含的 Shared<T>

这样做是安全的,因为我们保证是 SharedCell 的唯一持有者。

代码语言:javascript复制
use basedrop::{Collector, Shared, SharedCell};

let collector = Collector::new();
let x = Shared::new(&collector.handle(), 3);
let cell = SharedCell::new(x);

let x = cell.into_inner();

未来展望

BaseTrop 当前不支持动态类型,如 Owned<[T]> 或者 Owned<dyn Trait>。等待 Rust 的 CoerceUnsized 或者 equivalent 稳定时,这一点应该可以实现。目前,动态类型可以通过将 DST 封装到另一层分配中来解决,没有太多问题。

此外,Shared<T> 当前不支持循环数据结构的弱引用,如 Arc<T> 所做的那样。这会使引用计数逻辑复杂化(参见 Arc 源代码),我想从一些简单的东西开始。

我还想探索比引用计数开销更小的内存回收策略,例如 Linux 内核中的 RCU 模式、基于代(epoch-based)的回收,以及基于静态(quiescent state-based)的回收。我还没有想到这样一种设计,既符合 Rust 的所有权,又满足实时音频(和音频插件)的限制,但我认为这是有希望的方向。

谢谢您的阅读,欢迎交流。

0 人点赞