结构化并发

2022-01-20 16:04:00 浏览数 (1)

  • 提议:SE-0304
  • 作者:John McCall,Joe Groff,Doug Gregor,Konrad Malawski
  • 审核主管:Ben Cohen
  • 状态: 在 Swift 5.5 已实现 验收链接
  • 实现:在标记-Xfrontend -enable-experimental-concurrency后的 最近主快照 中可以找到

介绍

async/await是一种编写自然且高效异步代码的语言机制。异步函数(使用async声明)在执行任何挂起点的地方(使用await标记)都可以放弃它所在的线程,这对构建高并发系统非常有必要。

但是async/await本身并不引入并发:如果在异步函数内忽略挂起点,它基本与同步函数执行方式相同。本篇提议引入 Swift 中对结构化并发的支持,使用提高工效、可预测并允许高效实现的模型,来实现异步代码的并发执行。

Swift-evolution 关键点时间线:

  • 节点1
  • 节点2
  • 节点3
  • 第一次审核
  • 第二次审核
  • 第三次审核

动机

看一个异步准备晚餐的简单例子:

代码语言:swift复制
func chopVegetables() async throws -> [Vegetable] { ... }
func marinateMeat() async -> Meat { ... }
func preheatOven(temperature: Double) async throws -> Oven { ... }

// ...

func makeDinner() async throws -> Meal {
  let veggies = try await chopVegetables()
  let meat = await marinateMeat()
  let oven = try await preheatOven(temperature: 350)

  let dish = Dish(ingredients: [veggies, meat])
  return try await oven.cook(dish, duration: .hours(3))
}

在晚餐准备中的每一步都是异步操作,所以这里也存在多个挂起点。当在等待切蔬菜的过程中, makeDinner不会阻塞线程:它会挂起,直到蔬菜可用,然后继续执行。想象下,许多晚餐可能处于不同的准备阶段,大多数都会暂停,直到当前步骤完成。

尽管晚餐准备是异步进行,但实际上还是顺序执行。在开始腌制肉之前需要等待蔬菜切完,然后在预热锅之前又需要再次等待肉准备好。当晚餐完成后,我们的顾客真的可能会非常饥饿。

为了让晚餐准备地更快,我们需要并发的执行准备步骤。为了这样做,我们可以把菜谱分散在不同的任务中,这些任务可以并行。在切蔬菜的同时,可以腌制肉,也可以把锅预热。有时候在任务之间会有依赖:一旦蔬菜和肉准备好,我们可以把它们放到盘子里,但是在锅热之前不能把盘子中的菜倒入锅中。所有这些任务都是做晚餐这个大任务中的一部分。当所有的任务完成时,晚餐就开始了。

本篇提议旨在提供必要的工具, 把工作分成可以并行执行的较小任务,允许任务等待彼此完成,并有效管理任务的总体进度。

结构化并发

任何并发系统必须提供基础工具。必须有某种方法创建与现有线程并发运行的新线程,也必须有某种方法让线程等待另一个线程发出继续的信号。这都是很强大的工具,你可以用它们来写非常复杂的系统。但是它们同时也非常原始:他们很少做假设,但也很少给予你支持。

想象有某个函数,它在 CPU 上做了大量的工作。我们想通过把工作分散到两核来优化它:因此现在函数创建一个新线程,在每个线程做一半的工作,然后让原来的线程等待新线程完成。(在现代的系统中,函数可能会添加一个任务到全局线程池中,但是基础概念还是一样。)这两个线程所做的工作之间存在联系,但是系统不知道。这会让解决系统性的问题变得更困难。

例如,某个高优先级的操作需要该函数加快执行并完成,该操作可能知道提升第一个线程的优先级,但实际上应该同时提升这两个线程的优先级。充其量,第一个线程开始等待操作,才升级第二个线程的优先级。解决这个问题相对容易,也许可以让函数注册第二个应该升级的线程。但是这是个临时的解决方案,而且需要在每个想使用并发的函数内重复使用。

结构化并发通过要求程序员组织他们的并发用法到高级任务和及其子组件任务里来解决上述问题。这些任务变成并发的基本单元,而不是像线程一样的底层概念。以这种方式构造并发性允许信息在任务层级结构自然流动,否则在每个抽象级别和每个线程上都需要仔细编写任务细节来支持。这反过来又允许相对容易地解决许多高级问题。

例如:

  • 我们通常希望限制任务的总耗时。一些 API 通过传递超时时长来支持这点,但是通过抽象级别向下传递超时花费了大量的工作。这点尤为正确,因为程序员通常想把超时当作关联周期(比如 20ms),但是,库在内部传递的正确组合表示一个绝对的最后期限(比如 now 20ms)。在结构化并发下,可以在任务上添加截止日期,可以通过任意级别的 API 自然传递,包括子任务。
  • 类似,我们也通常希望取消正在执行的任务。异步接口通常通过同步返回某个提供一系列cancel()方法的 token 对象来支持。该方案大大增加了 API 设计的复杂度,所以大多数 API 不提供。而且,传递 token, 或者跟它们一起使用来取消所有正在执行的任务,会为项目带来重大工程挑战。在结构化并发下,取消操作很自然地在 API 和子任务中传递,并且 API 还可以调用 handler 来对取消操作做出及时响应。
  • 许多系统希望为操作维护自己的上下文信息,而不是将其传递到每一个抽象级别,比如当前正在服务记录信息的服务器。结构化并发允许它通过异步操作自然向下传递,作为一种“任务本地存储”,可以由子任务获取。
  • 依赖队列的系统通常容易受到队列泛滥的影响,队列接受的工作比它实际处理的多。这通常通过引入“back-pressure”来解决: 队列停止接收新工作,而尝试对工作排队的系统通过自身停止接收新任务来响应。参与者系统通常会破坏这一点,因为在调度程序级别很难拒绝向参与者队列添加工作,而且这样可能会泄漏资源,或者阻止操作完成,从而永久性地破坏系统的稳定性。结构化并发提供一种有限的,协作的解决方法,它允许系统在任务层级结构中向上传达它们遇到的问题,从而可能允许父任务停止或减缓类似新工作的创建。

本篇提案并没有为所有这些问题提出解决方案,但早期的调查显示了还是可以解决。

任务

在系统中任务是并发的基本单元。每个异步函数都在异步中执行。换句话说,任务是异步函数,线程是同步函数。就是:

  • 所有异步函数作为任务的一部分运行。
  • 一个任务一次运行一个函数;单个任务没有并发。
  • 当一个函数进行async调用时, 被调用函数仍然作为同一个任务的一部分在运行(调用者等待函数返回)。
  • 类似地,当一个函数从某个async调用返回,调用者在相同任务中恢复运行。

同步函数没有必要作为任务的一部分运行。

Swift 认为存在一个底层线程系统。系统调度任务运行在这些线程系统上。任务不需要从底层线程系统获取特殊的调度支持,尽管一个好的调度者可以利用 Swift 任务调度的一些有趣特性。

一个任务的状态是以下三种之一:

  • 一个已挂起任务有许多工作要做,但是不是当前运行的;
    • 它可能可以被调度,意味着该任务准备执行,只等着系统指示一个线程来开始执行它;
    • 或者它正在等待让它变得可调度的其他事件;
  • 一个正在运行的任务表示正在在线程中运行。
    • 该任务会一直运行,直到从它的初始化函数中返回(完成状态),又或者到达一个挂起点(挂起暂停状态)。在挂起点,如果该任务执行只需要改变参与者,那么它可能会立即变得可调度。
  • 一个已完成的任务没有其他工作要做,并且也从来不会进入其他状态。
    • 代码可以用各种方法等待任务完成,最显著的是使用await来标记该段代码。

我们讨论的对任务和异步函数的执行相比同步函数更复杂。异步函数作为任务的一部分运行,如果任务正在运行,任务和它当前的函数也都运行在一个线程中;

注意,当一个异步函数调用另一个异步函数时,我们任务正在调用的函数是挂起状态,但并不意味整个任务是挂起状态。从函数的角度来说,它是挂起的,等着调用返回。从任务的角度来说,它可能在被调用方中继续运行,或者可能已被挂起,为了更改为不同的执行上下文。

任务有3个高级目的:

  • 携带调度信息,比如任务优先级;
  • 充当一个 handle, 通过它可以取消,查询或控制操作;
  • 携带用户提供的本地任务数据。

在较低级别上,任务允许实现优化本地内存的分配,例如异步函数上下文。同时它也允许动态工具、崩溃报告和调试功能来发现函数怎么被使用。

子任务

异步函数可以创建子任务。子任务继承父任务的某些结构,包括优先级,但可以与之并发运行。但该并发有限制:创建子任务的函数必须等子任务结束才能返回。这种结构意味着函数可以对当前任务正在完成的所有工作进行局部推理,比如预测取消当前任务的影响等。它还使创建子任务的效率大大提高。

一个函数的任务本身当然也可能是另一个任务的子任务,它的父任务也可能有其他的父任务;函数不能局部推理出这些。但是这种设计适用于整个任务树的特性,比如取消操作,在任务结构层级中只会“向下”而不会自动向上传递,因此任务子树还是可以静态推理父任务是否取消。如果子任务没有限制期限(超时),那么可以任意延长其父任务的时间,这些特性下的任务不容易被理解。

在本提议中,创建子任务的方式仅在TaskGroup内,但是将有一个后续方案,支持在任何异步上下文中创建子任务。

作业(Jobs)

任务执行可以是任务运行的一系列阶段,每个阶段在挂起点结束或者在最终完成任务时结束。这些阶段被称为作业。作业是系统调度任务中的基本单元。它们也是异步函数和底层同步世界通信的原始通道,在大多数情况下,程序员不应该直接处理作业,除非他们实现了自定义执行器。

执行器(Executors)

执行器是一种服务,它接受作业提交,并安排线程来运行这些作业。系统认为执行器可靠且执行任务从不失败。

正在运行的异步函数始终知道它所在的执行器。这允许函数在当对同一个执行器调用时避免不必要的挂起,并且还会允许函数在它之前开始的同一执行器继续执行。

如果提交给执行器的作业不会并发执行,则执行器是独有的。(具体来说,作业必须完全按照“前发生”的关系排序:给定任意两个已经提交且运行的作业,其中一个作业的结束必须在发生另一个作业开始之前。)执行器不必完全按照作业提交的顺序来执行它们;实际上,它们通常应该尊重任务优先级而不是提交顺序。

Swift 提供了默认的执行器实现,但是 actors 和 global actors 能够忽略这个实现,而且它们还可以提供自己的实现。通常来说,终端用户不需要直接和执行器交互,而是通过调用函数隐式的使用它们,而这些函数正好使用执行器来执行已调用的异步函数。

任务优先级

每个任务对应一个指定的优先级。

任务优先级可以告知执行器决策,该决策是关于如何以及何时安排提交到它的任务。执行器可以利用优先级信息,以便尝试优先运行高优先级任务,然后继续运行低优先级任务。它还可以使用优先级信息影响平台线程优先级。

关于如何处理优先级的确切解释取决于每个平台和特定的执行器实现。

子任务自动继承父任务的优先级。独立任务不继承优先级(或者任何其他信息),因为他们没有父任务。

任务的优先级没有必要匹配它所在执行器的优先级。例如,Apple 平台的 UI 线程是个高优先级的执行器,任何提交到该执行器的任务在该线程运行期间都将以高优先级运行。它有助于 UI 线程在稍后提交时,能够运行更高优先级任务,也不会影响任务的正常优先级。

优先级升级

在某些场景下,任务优先级必须提升以避免优先级反转:

  • 如果某个任务代表 actor 运行,且 actor 中有个更高优先级的任务排队,那么该任务可能运行在这个更高优先级任务中的优先级中。这确实不会影响子任务或已汇报任务的优先级,它是正在运行任务的线程的特性,而非任务本身特性。
  • 如果某个任务创建时带有 handle 并且有更高优先级的任务等待该任务完成,该任务的优先级将会提升以匹配更高优先级的任务。这会影响子任务和已报告任务的优先级。

提议的解决方案

我们提议的解决方案遵循上述结构化并发规则。所有异步函数都做为异步任务的一部分运行。任务可以产生并发执行的子任务。这创建了任务的层级结构,信息可以在层级结构中上下流动,从而便于整体管理整个事情。

任务组和子任务

任务组定义了作用域,在其中可以通过程序方式创建新的子任务。与所有子任务一样,当任务组作用域退出后,其中子任务必须完成,如果任务组作用域退出时抛出错误,这些子任务都会被取消。

为了说明任务组,我们展示怎么把真实的并发性引入到上面准备晚餐的例子中:

代码语言:swift复制
func makeDinner() async throws -> Meal {
  // Prepare some variables to receive results from our concurrent child tasks
  var veggies: [Vegetable]?
  var meat: Meat?
  var oven: Oven?

  enum CookingStep { 
    case veggies([Vegetable])
    case meat(Meat)
    case oven(Oven)
  }
  
  // Create a task group to scope the lifetime of our three child tasks
  try await withThrowingTaskGroup(of: CookingStep.self) { group in
    group.addTask {
      try await .veggies(chopVegetables())
    }
    group.addTask {
      await .meat(marinateMeat())
    }
    group.addTask {
      try await .oven(preheatOven(temperature: 350))
    }
                                             
    for try await finishedStep in group {
      switch finishedStep {
        case .veggies(let v): veggies = v
        case .meat(let m): meat = m
        case .oven(let o): oven = o
      }
    }
  }

  // If execution resumes normally after `withTaskGroup`, then we can assume
  // that all child tasks added to the group completed successfully. That means
  // we can confidently force-unwrap the variables containing the child task
  // results here.
  let dish = Dish(ingredients: [veggies!, meat!])
  return try await oven!.cook(dish, duration: .hours(3))
}

注意,如果说:

代码语言:swift复制
var veggies: [Vegetable]?

try await withThrowingTaskGroup(of: Void.self) { group in
  group.addTask {
    // error: mutation of captured var 'veggies' in concurrently-executing code
    veggies = try await chopVegetables()
  }
}
let dish = Dish(ingredients: [veggies!])

这可能看起来很奇怪,因为子任务保证在withTaskGroup结束时以某种方式完成,所以理论上讲,直到任务组完成之前,只要并行的任务或者父任务本身没有同时读取这些变量,修改从它们的父上下文中捕获的变量是安全的。但是,Swift 的@Sendable闭包检查很严谨,除非我们给任务组语义专门的定义,这点留给后续的提议讨论。

withTaskGroup API 让我们可以访问任务组,并管理子任务的生命周期,这些子任务使用addTask()方法加到任务组中。随着withTaskGroup完成执行,所有的子任务也已经完成。子任务不会持续存在于创建它的任务作用域之外。随着作用域退出,子任务要么完成,要么在隐式的等待影响(await)。当作用域抛出错误时,子任务在await 之前将会被取消。

这些特性允许我们很好的包含我们在任务组内引入的并发性的影响:即使chopVegetables,marinateMeatpreheatOven并发执行,而且可能有不同次序,我们也可以确定,随着withTaskGroup函数返回或者抛出错误,它们将会以某种方式完成执行操作。另一方面,任务组自然的从子任务到父任务传递状态。在上面例子中,如果说菜刀出了点意外,chopVegetables()函数可能会抛出错误。这个抛出的错误代表切蔬菜这个子任务完成了。如预期那样,该错误也会从makeDinner()函数传出去。在makeDinner()函数以错误退出后,任何没有完成的子任务(腌制肉或者预热锅,也许两个都是)将会自动被取消。结构化并发意味着我们不需要手动传递错误和管理取消操作;如果调用withTaskGroup之后,控制流继续往下执行,我们可以断定任务组中所有子任务已经成功完成。

接下来进一步扩展上面示例,并将重点放在chopVegetables()操作上,该操作生成一个Vegetable数组。有了足够的厨师,如果我们把每种蔬菜分开切,切菜变得更快。先看一个有顺序切菜的版本:

代码语言:swift复制
/// Sequentially chop the vegetables.
func chopVegetables() async throws -> [Vegetable] {
  let rawVeggies: [Vegetable] = gatherRawVeggies()
  var choppedVeggies: [Vegetable] = []
  for v in rawVeggies {
    choppedVeggies.append(try await v.chopped())
  }
  return choppedVeggies
}

与最上层任务makeDinner不同,这里存在不定数量的潜在并发操作;这取决于我们能从gatherRawVeggies获取多少蔬菜,每份蔬菜原则上是可以同剩下的并行切。我们不必有序的获取切好的蔬菜,只用在它们准备好后收集。

为了创建动态数量的子任务并收集其结果,我们通过withTaskGroup引入新的任务组, 给子任务指定ChildTaskResult.Type,并且使用组的next方法在子任务结果准备好时收集:

代码语言:swift复制
/// Concurrently chop the vegetables.
func chopVegetables() async throws -> [Vegetable] {
  // Create a task group where each child task produces a Vegetable.
  try await withThrowingTaskGroup(of: Vegetable.self) { group in 
    var rawVeggies: [Vegetable] = gatherRawVeggies()
    var choppedVeggies: [Vegetable] = []
    
    // Create a new child task for each vegetable that needs to be chopped.
    for v in rawVeggies {
      group.addTask { 
        try await v.chopped()
      }
    }

    // Wait for all of the chopping to complete, collecting the veggies into
    // the result array in whatever order they're ready.
    while let choppedVeggie = try await group.next() {
      choppedVeggies.append(choppedVeggie)
    }
    
    return choppedVeggies
  }
}

跟第一个例子一样,如果withThrowingTaskGroup方法的闭包没有完成它所有的子任务就退出,任务组会一直等待,直到所有子任务在返回前已经完成。如果闭包以抛出错误退出,在错误传到父任务之前,未完成的子任务会第一个被取消。

跟 future-based 任务 API 相比,对子任务的引用无法脱离创建子任务的作用域。这确保结构化并发的结构得到维护,而且这不仅可以更容易去推断在给定的作用域内正在执行的并发任务,而且也为编译器和运行时提供了更多的优化机会。

异步程序

程序可以将@main 和异步的main()函数一起使用:

代码语言:swift复制
@main
struct Eat {
  static func main() async throws {
    let meal = try await makeDinner()
    print(meal)
  }
}

Swift 会创建一个新任务来执行main() 函数。一旦任务完成,该程序也会终止。

顶层代码也可以使用异步调用。例如:

代码语言:swift复制
// main.swift or a Swift script
let meal = try await makeDinner()
print(meal)

该模型与@main相同:Swift 创建任务来执行顶层代码,完成该任务,终止程序。

取消操作

任何对任务或其父任务中某一个引用的上下文都可以异步取消任务。取消操作可以通过在任务 handle 上调用cancel()来显式触发。取消操作也可以自动触发,例如,当父任务将错误抛出包含未等待的子任务的范围外时。

被取消任务里的取消效果完全协同和同步。也就是说,除非有检查取消,否则取消操作根本没有效果表现。理论上来说,大多数检查取消的函数通过CancellationError()报告,相应地,它们必须抛出函数,并且对它们的调用必须用某种形式的try修饰。因此,取消在异步函数内不会引入其他控制流路径,你可以随时查看函数并查看可能发生取消的位置。与任何其他抛出的错误一样,defer block 可以用于在取消操作之后承担高效清理的工作。

尽管如此,一般期望是异步函数应该尝试通过适当的抛出错误或者返回函数来响应取消。在大多数函数中,依赖可以等待很长时间的底层函数(例如,I/O 函数或者Task.Value)来检查取消或者提前中断就足够了。执行大量同步运算的函数可能希望显式的检查取消。

取消有两种效果,随着取消立即触发:

  • 在任务中设置标志,将其标记为已取消;一旦设置此标志后,它将永远不会被清除;作为任务的一部分,同步运行的操作可以检查此标志, 通常预期会抛出CancellationError错误。
  • 任何注册在任务中的取消 handler 都将直接运行。这允许需要直接响应的函数执行该操作。

我们可以使用之前的chopVegetables()函数来说明:

代码语言:swift复制
func chopVegetables() async throws -> [Vegetable] {
  return try await withThrowingTaskGroup(of: Vegetable.self) { group in
    var veggies: [Vegetable] = []

    group.addTask {
      try await chop(Carrot()) // (1) throws UnfortunateAccidentWithKnifeError()
    }
    group.addTask {
      try await chop(Onion()) // (2)
    }

    for try await veggie in group { // (3)
      veggies.append(veggie)
    }
                                                       
    return veggies
  }
}

在(1),我们开启了切胡萝卜的子任务。假设该函数调用抛出了错误,因为该操作是异步的,错误不会立即出现在chopVegetables函数内,函数内继续开始处理在 onion (2)处的第二个子任务。在(3),我们等待下一个完成的任务,它是我们创建的子任务其中之一,为了便于讨论,我们认为这里恰巧是(1)中的chop(Carrot())子任务。这导致抛出已经在chop中抛出的错误,因为我们没有处理该错误,在没有等待 onion-chopping 任务返回后便退出该作用域。这将造成任务自动取消。因为任务是协同的,又因为结构化并发不允许子任务持续时间比它们父任务上下文长,onion-chopping 任务实际完成之前,控制流实际不会返回;它返回的任何值或者抛出错误都会被忽略。

正如之前提到的,取消操作在任务上的效果是同步且协同的。有着大量运算的函数更希望可以显式检查取消操作。它们可以通过检查任务取消状态来做到这点:

代码语言:swift复制
func chop(_ vegetable: Vegetable) async throws -> Vegetable {
  try Task.checkCancellation() // automatically throws `CancellationError`
  // chop chop chop ...
  // ... 
  
  guard !Task.isCancelled else { 
    print("Cancelled mid-way through chopping of (vegetable)!")
    throw CancellationError() 
  } 
  // chop some more, chop chop chop ...
}

请注意,这里都没有向任务传递有关任务取消的原因,任务取消的原因有多种,在初始化取消操作之后,还会产生其他原因(比如,如果任务失败退出了,它可能超时了)。取消的目标是允许任务以轻量级的方式取消,而不是成为任务间通信的第二种方法。

非结构化任务

目前我们谈论任务的所有类型都是子任务,它们遵守结构化并发的主要规则:子任务生命周期不能比创建它父任务的生命周期长。这点对任务组和 SE-0317 也是如此。

然而,有时候,这些严格的规则最终会变得过于严格。我们可能需要创建新任务,其生命周期不受创建任务的约束,例如,为了触发并忘记某些操作,或者从同步代码启动异步工作。非结构化任务无法利用 wrt 的一些优化技术。分配和元数据传递(元数据信息有任务优先级,任务本地值等)虽然是子任务,但它们仍然是一个非常重要的构建块,特别是对于更自由形式的使用和与遗留 API 的集成。

所有非结构化任务通过任务 handle 来表示,handle 可以用来解析任务产生的值(或者抛出的错误),可以取消任务,或查询任务状态。一个新任务可以使用Task { ... }开启。例如:

代码语言:swift复制
let dinnerHandle = Task {
  try await makeDinner()
}

该初始化创建了一个新任务并开始执行提供的闭包。该新任务由引用新启动任务的构造任务 handle 表示(本例中为 Task<<Meal, Error>)。任务 handle 可以用来等待任务结果,例如:

代码语言:swift复制
let dinner = try await dinnerHandle.value

即使没有保存任务 handle 的使用,任务也会运行完成,所以没有必要去保存任务 handle 或者监听任务是否完成。但是,任务 handle 可以用来显式的取消,比如:

代码语言:swift复制
dinnerHandle.cancel()
上下文继承

使用Task创建的非结构化任务从创建它的上下文中继承了重要元数据信息,包括优先级,任务本地值,以及参与者隔离。

如果从现有任务的上下文中调用:

  • 继承正在执行的同步函数的当前任务优先级
  • 通过复制任务本地值到新的非结构化任务中来继承它们
  • 如果在指定的 actor 函数作用域内执行:
    • 继承 actor 的执行内容并且在执行器上(而不是全局并发执行器)运行任务,
    • 传给Task {}的闭包对 actor 来说变成了参与者隔离,允许访问参与者隔离状态,包括可变属性和不可发送的(non-sendable)值。
  • 如果从任务之外的上下文中调用:
    • 在运行时中查找并推断要使用的最佳优先级(比如线程优先级),
    • 即使没有可从中继承任务本地值的任务,也要检查为当前同步上下文存储的任何任务本地值的回退机制(在 SE-0311 方案中对此进行了深入讨论)
    • 在全局并发器上执行,并且对于任务 actor 都是非隔离的。
独立任务

独立任务是独立于创建它的上下文的非结构化任务,这意味着它不会继承优先级,任务本地值,及 actor 上下文。可以使用Task.detached函数来创建新的独立任务:

代码语言:swift复制
let dinnerHandle = Task.detached {
  try await makeDinner()
}

Task.detached操作生成新的任务实例(在当前例子中是Task<Meal, Error>), 以与任务初始值设定项相同的方式。

设计细节

任务 API

结构化编程的大部分实现都是关于创建,查询,和管理任务的 API。

任务类型

Task类型描述任务,可用于查询和取消该任务。它也可以用作正在执行任务操作的命名空间。

代码语言:swift复制
struct Task<Success: Sendable, Failure: Error>: Equatable, Hashable, Sendable { ... }

Task类型的实例可以用来检索当前执行任务的结果(或者抛出的错误) 。该检索操作都是async的:

代码语言:swift复制
extension Task {
  /// Retrieve the result produced the task, if is the normal return value, or
  /// throws the error that completed the task with a thrown error.
  var value: Success {
    get async throws
  }
    
  /// Retrieve the result produced by the task as a c Result instance.
  var result: Result<Success, Failure> { get async }
}

extension Task where Failure == Never {
  /// Retrieve the result produced by a task that is known to never throw.
  var value: Success {
    get async
  }
}

任务实例中value属性是主要的消费者接口,它返回任务产生的结果或者抛出任务产生的错误(如果任务通过抛出错误退出的话)。例如:

代码语言:swift复制
func eat(mealHandle: Task<Meal, Error>) async throws {
  let meal = try await mealHandle.value
  meal.eat() // yum
}

任务实例也提供取消任务的能力:

代码语言:swift复制
extension Task {
  /// Cancel the task referenced by this handle.
  func cancel()
  
  /// Determine whether the task was cancelled.
  var isCancelled: Bool { get }
}

如其他地方所述,取消操作是协同的:任务会记录它本身已经取消并选择提前返回(不管是通过正常返回还是抛出错误返回,视情况而定)。isCancelled能够用来确定特定任务是否曾被取消。

UnsafeCurrentTask类型

UnsafeCurrentTask类型提供了用于和运行中任务本身交互的非静态函数。不安全任务对象绝不能从另一个任务任务中转义或者访问,它的API采用熟悉的作用域withUnsafeCurrentTaskwith...格式:

代码语言:swift复制
func withUnsafeCurrentTask<T>(
    body: (UnsafeCurrentTask?) throws -> T
) rethrows -> T

withUnsafeCurrentTask会把当前任务传给操作,或者如果从任务不可用的上下文调用函数,则会传给nil。实际上,这意味着在调用之前,调用链中没有任务异步函数。如果在unsafeCurrent触发之前,调用链中存在异步函数,任务将会返回。

UnsafeCurrentTask有意命名为 unsafe,因为它可能会暴露只能从任务自身安全调用的 API,如果从另一个任务调用会表现出未定义的行为。因为保存UnsafeCurrentTask任务并后续使用并不安全。此类不安全API示例和任务对象中的任务局部值通信,该任务必须是能够安全执行的"当前"任务,这是经过设计,用来为任务存储的正常、安全的访问模式提供了运行时优化机会。

从其他任务/线程调用该 API 将会造成未定义行为。

访问该 API 将执行特定线程局部变量的线程局部查找,该变量由 Swift 并发运行时维护。

withUnsafeCurrentTask函数也可能从同步(以及异步)代码中调用:

代码语言:swift复制
func synchronous() {
  withUnsafeCurrentTask { maybeUnsafeCurrentTask in 
    if let unsafeCurrentTask = maybeUnsafeCurrentTask {
      print("Seems I was invoked as part of a Task!")
    } else {
      print("Not part of a task.")
    }
  }
}

func asynchronous() async {
  // the following is safe, because withUnsafeCurrentTask is invoked from an 'async' function
  withUnsafeCurrentTask { maybeUnsafeCurrentTask in 
    let task: UnsafeCurrentTask = maybeUnsafeCurrentTask! // always ok
  }
}

withUnsafeCurrentTask返回一个可选的UnsafeCurrentTask, 这是因为这些异步函数被某个任务调用(比如,内部异步 Swift 代码),或者被外部函数调用(比如一些不知道任务的API,像底层 pthread 线程调用 Swift 代码)。

UnsafeCurrentTask 同样是EquatableHashable, 这些标识基于内部任务对象,该对象与Task使用的对象相同。

代码语言:swift复制
struct UnsafeCurrentTask: Equatable, Hashable {
  public var isCancelled: Bool { get }
  public var priority: TaskPriority { get }
  public func cancel()
} 

UnsafeCurrentTaskTask具有所有相同的查询操作(比如isCancelled,priority等等),以及取消功能(cancel()), 在不安全任务与正常任务上调用同样安全,但将来可能会定义更多更脆弱的 API,并且只能在执行相同任务时调用(例如,访问另一个提议中定义的 任务局部值)。

任务优先级

任务优先级被执行器用来做调度决定。

优先级从高到低,越高越重要。

为了避免强制其他平台使用 Darwin 特定的术语,优先级使用基础术语比如 "high" 和 "low"。但是,Darwin 指定的名字是作为别名存在,可以互换使用:

代码语言:swift复制
/// Describes the priority of a task.
struct TaskPriority: Codable, Comparable, RawRepresentable, Sendable {
  var rawValue: UInt8 { get set }
  init(rawValue: UInt8)
}

/// General, platform independent priority values.
/// 
/// The priorities are ordered from highest to lowest as follows:
/// - `high`
/// - `medium`
/// - `low`
/// - `background`
extension TaskPriority {
  static var high: TaskPriority { ... }
  static var medium: TaskPriority { ... }
  static var low: TaskPriority { ... }
  static var background: TaskPriority { ... }
}

/// Apple platform specific priority aliases.
/// 
/// The priorities are ordered from highest to lowest as follows:
/// - `userInitiated` (alias for `high` priority)
/// - `utility` (alias for `low` priority)
/// - `background`
/// 
/// The runtime reserves the right to use additional higher or lower priorities than those publicly listed here,
/// e.g. the main thread in an application might run at an user inaccessible `userInteractive` priority, however
/// any task created from it will automatically become `userInitiated`.
extension TaskPriority {
  /// The task was initiated by the user and prevents the user from actively using
  /// your app.
  /// 
  /// Alias for `TaskPriority.high`.
  static var userInitiated: TaskPriority { ... }
  
  /// Priority for a utility function that the user does not track actively.
  /// 
  /// Alias for `TaskPriority.low`
  static var utility: TaskPriority { ... }
}

extension Task where Success == Never, Failure == Never { 
  /// Returns the `current` task's priority.
  /// 
  /// When called from a context with no `Task` available, will return the best 
  /// approximation of the current thread's priority, e.g. userInitiated for 
  /// the "main thread" or default if no specific priority can be detected. 
  static var currentPriority: TaskPriority { ... }
}

priority操作会查询任务的优先级。

currentPriority操作会查询当前执行任务的优先级。任务优先级会在任务创建(如Task.detachedTaskGroup.addTask)时设置,并且之后任务优先级可以提升,比如有个较高优先级任务在某个较低优先级任务中等待处理。

非结构化任务

非结构化任务可以使用Task初始化创建:

代码语言:swift复制
extension Task where Failure == Never {
  @discardableResult
  init(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> Success
   )
}

extension Task where Failure == Error {
  @discardableResult
  init(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> Success
  )
}

初始化方法被标记为@discardableResult, 是因为当 handle 未使用时,任务本身会立即执行操作并运行完成。这是一个相当常见的忽略结果的异步操作用例。

一般来说,新任务会在默认全局并发执行器初始化调度。一旦在另一个方案中引入了自定义执行器,它们将能够采用执行器参数来确定在哪个执行器上安排新任务。

优先级传递

Task初始化方法会把优先级从它被调用的地方传到它创建的独立任务:

  1. 如果同步代码代表任务运行(比如withUnsafeCurrentTask提供非nil的任务),使用该任务的优先级;
  2. 如果同步代码代表 UI 线程运行,使用.userInitiated; 否则
  3. 向系统查询来确定当前正在执行线程的优先级并使用它。

该实现还将把其他重要的特定于操作系统的信息从同步代码传到异步代码中。

Actor 上下文传递

传给Task初始化的闭包将会隐式地继承 actor 执行上下文和闭包组成上下文的隔离。例如:

代码语言:swift复制
func notOnActor(_: @Sendable () async -> Void) { }

actor A {
  func f() {
    notOnActor {
      await g() // must call g asynchronously, because it's a @Sendable closure
    }
    Task {
      g() // okay to call g synchronously, even though it's @Sendable
    }
  }
  
  func g() { }
}

某种程度,Task初始化抵消了@Sendable对参与者内闭包的正常影响。具体而言,SE-0306 表明@Sendable闭包不是隔离的:

actor 通过指定@Sendable闭包为非隔离状态来杜绝这种数据竞争。

闭包同时是@Sendable和 actor-isolated,只可能是因为闭包也是async的。事实上,当闭包被调用时,为了能够在 actor 内部运行,它会立即跳到 actor 的上下文。

隐式的 "self"

传给Task初始化器的闭包不必显式使用self.表示self

代码语言:swift复制
func acceptEscaping(_: @escaping () -> Void) { }

class C {
  var counter: Int = 0
  
  func f() {
    acceptEscaping {
      counter = counter   1   // error: must use "self." because the closure escapes
    }
    Task {
      counter = counter   1   // okay: implicit "self" is allowed here
    }
  }
}

当在某个逃逸闭包中捕获self,并要求self.的真正目的是为了提醒开发者可能存在循环引用。传给Task的闭包会立即执行,对self的唯一引用是在函数体中发生的内容。因此,在Task中使用显式self.没有传达有用的信息,不需要使用。

注意:这个规则同样适用于Task.detachedTaskGroup.addTask中的闭包。

独立任务

使用Task.detached来创建一个新的独立任务。生成的任务由Task表示:

代码语言:swift复制
extension Task where Failure == Never {
  /// Create a new, detached task that produces a value of type `Success`.
  @discardableResult
  static func detached(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> Success
  ) -> Task<Success, Never>
}

extension Task where Failure == Error {
  /// Create a new, detached task that produces a value of type `Success` or throws an error.
  @discardableResult
  static func detached(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> Success
  ) -> Task<Success, Failure>
}

独立任务一般会使用尾随闭包来创建,例如:

代码语言:swift复制
let dinnerHandle: Task<Meal, Error> = Task.detached {
  try await makeDinner()
}

try await eat(mealHandle: dinnerHandle)
取消操作

isCancelled确定给定的任务是否已经取消:

代码语言:swift复制
extension Task {
  /// Returns `true` if the task is cancelled, and should stop executing.
  var isCancelled: Bool { get }
}

也有可能从同步任务中查询取消,比如,当遍历某个循环,想使用Task.isCancelled属性检查任务是否中断:

代码语言:swift复制
extension Task where Success == Never, Failure == Never { 
  /// Returns `true` if the task is cancelled, and should stop executing.
  ///
  /// Always returns `false` when called from code not currently running inside of a `Task`.
  static var isCancelled: Bool { get }
}

这与它实例对应项的工作原理相同,只是如果从没有可用任务上下文调用,例如,如果从 Swift 并发模型外调用(比如直接使用 pthread 调用),将会返回默认值。staticisCancelled属性实现方式如下:

代码语言:swift复制
extension Task where Success == Never, Failure == Never {
  static var isCancelled: Bool { 
    withUnsafeCurrentTask { task in 
      task?.isCancelled ?? false  
    }
  }
}

此静态isCancelled属性始终可以安全调用,即它可以从同步或异步函数调用,且始终会返回预期结果。

但是请注意,在同时设置取消的同时检查取消可能会有点不正常,即如果取消是从另一个线程执行的,isCancelled可能不会返回true

对于在取消操作中会立即通过抛出错误退出的任务,任务 API 提供了通用的错误类型CancellationError, 用于告知任务已经被取消。当任务已经取消,Task.checkCancellation()操作将会抛出CancellationError,这也是为了方便起见。

代码语言:swift复制
/// The default cancellation thrown when a task is cancelled.
///
/// This error is also thrown automatically by `Task.checkCancellation()`,
/// if the current task has been cancelled.
struct CancellationError: Error {
  // no extra information, cancellation is intended to be light-weight
  init() {}
}

extension Task where Success == Never, Failure == Never {
  /// Returns `true` if the task is cancelled, and should stop executing.
  ///
  /// - SeeAlso: `checkCancellation()`
  static func checkCancellation() throws
}
取消 handler

对于希望立即对取消做出反应(而不是等待取消错误向上传递),可以声明取消处理 handler:

代码语言:swift复制
/// Execute an operation with cancellation handler which will immediately be
/// invoked if the current task is cancelled.
///
/// This differs from the operation cooperatively checking for cancellation
/// and reacting to it in that the cancellation handler is _always_ and
/// _immediately_ invoked when the task is cancelled. For example, even if the
/// operation is running code which never checks for cancellation, a cancellation
/// handler still would run and give us a chance to run some cleanup code.
///
/// Does not check for cancellation, and always executes the passed `operation`.
///
/// This function returns instantly and will never suspend.
func withTaskCancellationHandler<T>(
  operation: () async throws -> T,
  onCancel handler: @Sendable () -> Void
) async rethrows -> T

该函数不会创建新任务,而是立即执行operation, 一旦operation返回,withTaskCancellationHandler也将返回(这点跟throw的行为相似)。

请注意handler对剩下的任务运行@Sendable,因为当任务取消后,会直接执行 handler,这可以发生在任何点。如果在调用withTaskCancellationHandler时任务已经被取消,在执行operationblock 之前,会立即调用取消 handler。

这些属性对取消 handler 闭包可以安全执行的操作上设置了相当严格的限制,但是取消 handler 在任意点可以触发的功能让它可以有效管理相关对象的状态,在无法从任务内部轮询取消状态或通过抛出 CancellationError 传递取消状态的情况下。例如,取消 handler 与 continuation 结合使用,通过非异步事件驱动接口来帮助线程取消。看这个例子,某个函数想在异步函数接口中封装Foudation 中的URLSession对象,如果异步自身取消了,则取消URLSession。代码可能是这样:

代码语言:swift复制
func download(url: URL) async throws -> Data? {
  var urlSessionTask: URLSessionTask?

  return try withTaskCancellationHandler {
    return try await withUnsafeThrowingContinuation { continuation in
      urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in
        if let error = error {
          // Ideally translate NSURLErrorCancelled to CancellationError here
          continuation.resume(throwing: error)
        } else {
          continuation.resume(returning: data)
        }
      }
      urlSessionTask?.resume()
    }
  } onCancel: {
    urlSessionTask?.cancel() // runs immediately when cancelled
  }
}
主动挂起

对于长时间运行的操作,例如在没有自然挂起点的紧循环中执行大量计算,偶尔执行检查是否应该挂起并未其他任务提供继续执行的机会可能是有益的。(例如所有任务运行在某个共享的有限并发池中)。对于这种使用场景,Task包含suspend()操作,这是一种显式挂起当前任务,并给其他任务有机会运行一段时间的办法。

代码语言:swift复制
extension Task where Success == Never, Failure == Never {
  static func suspend() async { ... }
}

同时还提供一个异步 sleep 函数,它接受纳秒时间来挂起:

代码语言:swift复制
extension Task where Success == Never, Failure == Never {
  public static func sleep(nanoseconds duration: UInt64) async throws { ... }
}

sleep 函数接受一个纳秒整型参数,这反映了同步世界中执行相同操作的已知顶层函数。当 sleep 时,如果任务取消,函数会抛出CancellationError,不用等待整个休眠周期。

如果基础库中有时间和截止类型,sleep 可以用await Task.sleep(until: deadline)await Task.sleep(for: .seconds(1))或者相似的表示,这样sleep函数将会获取更好的重载。当前提议并没有包含这些时间类型,目前本提议中只有一个基本 sleep 函数。

任务组

在任何异步上下文中,使用withTaskGroup创建任务组,它提供了能够并发创建和执行新任务的范围。

代码语言:swift复制
/// Starts a new task group which provides a scope in which a dynamic number of
/// tasks may be created.
///
/// Tasks added to the group by `group.addTask()` will automatically be awaited on
/// when the scope exits. If the group exits by throwing, all added tasks will
/// be cancelled and their results discarded.
///
/// ### Implicit awaiting
/// When the group returns it will implicitly await for all child tasks to
/// complete. The tasks are only cancelled if `cancelAll()` was invoked before
/// returning, the groups' task was cancelled, or the group body has thrown.
///
/// When results of tasks added to the group need to be collected, one can
/// gather their results using the following pattern:
///
///     while let result = await group.next() {
///       // some accumulation logic (e.g. sum  = result)
///     }
///
/// It is also possible to collect results from the group by using its
/// `AsyncSequence` conformance, which enables its use in an asynchronous for-loop,
/// like this:
///
///     for await result in group {
///       // some accumulation logic (e.g. sum  = result)
///     }
///
/// ### Cancellation
/// If the task that the group is running in is cancelled, the group becomes 
/// cancelled and all child tasks created in the group are cancelled as well.
/// 
/// Since the `withTaskGroup` provided group is specifically non-throwing,
/// child tasks (or the group) cannot react to cancellation by throwing a 
/// `CancellationError`, however they may interrupt their work and e.g. return 
/// some best-effort approximation of their work. 
///
/// If throwing is a good option for the kinds of tasks created by the group,
/// consider using the `withThrowingTaskGroup` function instead.
///
/// Postcondition:
/// Once `withTaskGroup` returns it is guaranteed that the `group` is *empty*.
///
/// This is achieved in the following way:
/// - if the body returns normally:
///   - the group will await any not yet complete tasks,
///   - once the `withTaskGroup` returns the group is guaranteed to be empty.
func withTaskGroup<ChildTaskResult: Sendable, GroupResult>(
  of childTaskResult: ChildTaskResult.Type,
  returning returnType: GroupResult.Type = GroupResult.self,
  body: (inout TaskGroup<ChildTaskResult>) async -> GroupResult
) async -> GroupResult { ... } 


/// Starts a new throwing task group which provides a scope in which a dynamic 
/// number of tasks may be created.
///
/// Tasks added to the group by `group.addTask()` will automatically be awaited on
/// when the scope exits. If the group exits by throwing, all added tasks will
/// be cancelled and their results discarded.
///
/// ### Implicit awaiting
/// When the group returns it will implicitly await for all created tasks to
/// complete. The tasks are only cancelled if `cancelAll()` was invoked before
/// returning, the groups' task was cancelled, or the group body has thrown.
///
/// When results of tasks added to the group need to be collected, one can
/// gather their results using the following pattern:
///
///     while let result = await try group.next() {
///       // some accumulation logic (e.g. sum  = result)
///     }
///
/// It is also possible to collect results from the group by using its
/// `AsyncSequence` conformance, which enables its use in an asynchronous for-loop,
/// like this:
///
///     for try await result in group {
///       // some accumulation logic (e.g. sum  = result)
///     }
///
/// ### Thrown errors
/// When tasks are added to the group using the `group.addTask` function, they may
/// immediately begin executing. Even if their results are not collected explicitly
/// and such task throws, and was not yet cancelled, it may result in the `withTaskGroup`
/// throwing.
///
/// ### Cancellation
/// If the task that the group is running in is cancelled, the group becomes 
/// cancelled and all child tasks created in the group are cancelled as well.
/// 
/// If an error is thrown out of the task group, all of its remaining tasks
/// will be cancelled and the `withTaskGroup` call will rethrow that error.
///
/// Individual tasks throwing results in their corresponding `try group.next()`
/// call throwing, giving a chance to handle individual errors or letting the
/// error be rethrown by the group.
///
/// Postcondition:
/// Once `withThrowingTaskGroup` returns it is guaranteed that the `group` is *empty*.
///
/// This is achieved in the following way:
/// - if the body returns normally:
///   - the group will await any not yet complete tasks,
///     - if any of those tasks throws, the remaining tasks will be cancelled,
///   - once the `withTaskGroup` returns the group is guaranteed to be empty.
/// - if the body throws:
///   - all tasks remaining in the group will be automatically cancelled.
func withThrowingTaskGroup<ChildTaskResult: Sendable, GroupResult>(
  of childTaskResult: ChildTaskResult.Type,
  returning returnType: GroupResult.Type = GroupResult.self,
  body: (inout ThrowingTaskGroup<ChildTaskResult, Error>) async throws -> GroupResult
) async rethrows -> GroupResult { ... } 

/// A group of tasks, each of which produces a result of type `TaskResult`.
struct TaskGroup<ChildTaskResult: Sendable> {
  // No public initializers
}

TaskGroup没有公共的初始化方法,而是把TaskGroup实例传给withTaskGroup函数的body函数。这个实例不应该复制到body函数中,因为这样做会打破子任务结构。

注意:Swift 现在没法保证传给body函数的任务组实例不在其他地方复制,因此我们需要依赖开发者自觉用诸如Array.withUnsafeBufferPointer这种手动方式。然而,在任务组情况下,如果在任务组的响应范围结束之后尝试使用任务组实例,我们至少可以提供一个运行时断言来处理这种情况。

withTaskGroup的结果是body函数产生的结果。函数的withThrowingTaskGroup版本允许任务组抛出错误,如果抛出了错误,任务组包含的所有任务会在抛出错误之前隐式地取消。

注意:有个糟糕的点是不可能使用一个函数来实现 throwing/non-throwing 功能。抛出的group.addTask和抛出next的复杂关系和以及相应的 throwing/non-throwingAsyncSequence一致性使得不可能在一个函数中实现所有功能。

同样注意withThrowingTaskGroup使用了ThrowingTaskGroup<ChildTaskResult, Error>,但是,无法指定该错误的类型。这是因为ThrowingTaskGroup上的此失败参数仅用作面向 future API,以防止 Swift 在某个时间点获得类型化抛出。

一个任务组在返回之前会await所有加进来的任务。

此等待可以通过以下方式执行:

  • 通过任务组内部代码(比如重复使用next()直到返回nil)
  • body返回时,隐式的在任务组本身中

默认情况下,任务组在全局默认并发执行器上调度加到组内的子任务。将来,很可能会使用addTask的可选执行器参数来自定义执行器任务。

创建 TaskGroup 子任务

body函数内,可以使用addTask操作来动态添加任务。每个任务返回相同类型值(ChildTaskResult):

代码语言:swift复制
extension TaskGroup {
  /// Unconditionally create a child task in the group.
  /// 
  /// The child task will be executing concurrently with the group, and its result 
  /// may be collected by calling `group.next()` or iterating over the group gathering 
  /// all submitted task results from the group.
  mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> ChildTaskResult
  )

  /// Attempts to create a child task in the group, unless the group is already cancelled.
  /// 
  /// If the task that initiates the call to `addTaskUnlessCancelled`  was already cancelled,
  /// or if the group was explicitly cancelled by invoking `group.cancelAll()`, no child
  /// task will be created.
  /// 
  /// The child task will be executing concurrently with the group, and its result 
  /// may be collected by calling `group.next()` or iterating over the group gathering 
  /// all submitted task results from the group.
  /// 
  /// Returns true if the task was created successfully, and false otherwise.
  mutating func addTaskUnlessCancelled(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async -> ChildTaskResult
  ) -> Bool
}

extension ThrowingTaskGroup { 
  mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> ChildTaskResult
  )
  
  mutating func addTaskUnlessCancelled(
    priority: TaskPriority? = nil,
    operation: @Sendable @escaping () async throws -> ChildTaskResult
  ) -> Bool
}

group.addTask在任务组中创建子任务,并发执行子任务中给定的操作。每个任务都是创建任务组任务的子任务,除了给定了一个优先级之外,它们的优先级也相同。通常来说,不推荐手动指定优先级。

使用addTask添加某个新的子任务到任务组总是成功的,即使该任务已经取消了或者任务组通过group.cancelAll取消了。任务组如果已经取消,在其中新创建的子任务将是cancelled状态。为了避免这个问题,addTaskUnlessCancelled函数检查任务组在尝试创建子任务之前,是否已经被取消,并且返回一个Bool值, 表示子任务是否成功创建。这允许简单的实现组,这些组应"继续创建任务,直到取消"。

取消一个指定任务组子任务不会取消整个任务组或其任何同级任务。

之前把group.addTask操作设计为一个挂起点,这是一种简单的反压机制,任务组可以决定不允许超过 N 个任务并发运行。这还没有完全设计和实现,所以目前已经转向了未来的方向。

在任务组内查询任务

next操作允许从任务组创建的任务中获取结果。该方法返回任务组中任务的结果,无论是正常的结果还是抛出的错误。

代码语言:swift复制
extension TaskGroup: AsyncSequence {

  /// Wait for the a child task that was added to the group to complete,
  /// and return (or rethrow) the value it completed with. If no tasks are
  /// pending in the task group this function returns `nil`, allowing the
  /// following convenient expressions to be written for awaiting for one
  /// or all tasks to complete:
  ///
  /// Await on a single completion:
  ///
  ///     if let first = try await group.next() {
  ///        return first
  ///     }
  ///
  /// Wait and collect all group child task completions:
  ///
  ///     while let first = try await group.next() {
  ///        collected  = value
  ///     }
  ///     return collected
  ///
  /// Awaiting on an empty group results in the immediate return of a `nil`
  /// value, without the group task having to suspend.
  ///
  /// It is also possible to use `for await` to collect results of a task groups:
  ///
  ///     for await value in group {
  ///         collected  = value
  ///     }
  ///
  /// ### Thread-safety
  /// Please note that the `group` object MUST NOT escape into another task.
  /// The `group.next()` MUST be awaited from the task that had originally
  /// created the group. It is not allowed to escape the group reference.
  ///
  /// Note also that this is generally prevented by Swift's type-system,
  /// as the `add` operation is `mutating`, and those may not be performed
  /// from concurrent execution contexts, such as child tasks.
  ///
  /// ### Ordering
  /// Order of values returned by next() is *completion order*, and not
  /// submission order. I.e. if tasks are added to the group one after another:
  ///
  ///     group.addTask { 1 }
  ///     group.addTask { 2 }
  ///
  ///     print(await group.next())
  ///     /// Prints "1" OR "2"
  ///
  /// ### Errors
  /// If an operation added to the group throws, that error will be rethrown
  /// by the next() call corresponding to that operation's completion.
  ///
  /// It is possible to directly rethrow such error out of a `withTaskGroup` body
  /// function's body, causing all remaining tasks to be implicitly cancelled.
  mutating func next() async -> ChildTaskResult? { ... }

  /// Wait for all of the child tasks to complete.
  ///
  /// This operation is the equivalent of
  ///
  ///     for await _ in self { }
  ///
  mutating func waitForAll() async { ... }

  /// Query whether the group has any remaining tasks.
  ///
  /// Task groups are always empty upon entry to the `withTaskGroup` body, and
  /// become empty again when `withTaskGroup` returns (either by awaiting on all
  /// pending tasks or cancelling them).
  ///
  /// - Returns: `true` if the group has no pending tasks, `false` otherwise.
  var isEmpty: Bool { ... } 
}
代码语言:swift复制
extension ThrowingTaskGroup: AsyncSequence {

  /// Wait for the a child task that was added to the group to complete,
  /// and return (or rethrow) the value it completed with. If no tasks are
  /// pending in the task group this function returns `nil`, allowing the
  /// following convenient expressions to be written for awaiting for one
  /// or all tasks to complete:
  ///
  /// Await on a single completion:
  ///
  ///     if let first = try await group.next() {
  ///        return first
  ///     }
  ///
  /// Wait and collect all group child task completions:
  ///
  ///     while let first = try await group.next() {
  ///        collected  = value
  ///     }
  ///     return collected
  ///
  /// Awaiting on an empty group results in the immediate return of a `nil`
  /// value, without the group task having to suspend.
  ///
  /// It is also possible to use `for await` to collect results of a task groups:
  ///
  ///     for await try value in group {
  ///         collected  = value
  ///     }
  ///
  /// ### Thread-safety
  /// Please note that the `group` object MUST NOT escape into another task.
  /// The `group.next()` MUST be awaited from the task that had originally
  /// created the group. It is not allowed to escape the group reference.
  ///
  /// Note also that this is generally prevented by Swift's type-system,
  /// as the `add` operation is `mutating`, and those may not be performed
  /// from concurrent execution contexts, such as child tasks.
  ///
  /// ### Ordering
  /// Order of values returned by next() is *completion order*, and not
  /// submission order. I.e. if tasks are added to the group one after another:
  ///
  ///     group.addTask { 1 }
  ///     group.addTask { 2 }
  ///
  ///     print(await group.next())
  ///     /// Prints "1" OR "2"
  ///
  /// ### Errors
  /// If an operation added to the group throws, that error will be rethrown
  /// by the next() call corresponding to that operation's completion.
  ///
  /// It is possible to directly rethrow such error out of a `withTaskGroup` body
  /// function's body, causing all remaining tasks to be implicitly cancelled.
  mutating func next() async throws -> ChildTaskResult? { ... } 

  /// Wait for a task to complete and return the result or thrown error packaged in
  /// a `Result` instance. Returns `nil` only when there are no tasks left in the group.
  mutating func nextResult() async -> Result<ChildTaskResult, Error>?

  /// Wait for all of the child tasks to complete, or throws an error if any of the
  /// child tasks throws.
  ///
  /// This operation is the equivalent of
  ///
  ///     for try await _ in self { }
  ///
  mutating func waitForAll() async throws { ... }

  /// Query whether the task group has any remaining tasks.
  var isEmpty: Bool { ... } 
}

next()操作经常在while循环中使用用来收集任务组内所有未完成任务的结果。例如:

代码语言:swift复制
while let result = await group.next() {
  // some accumulation logic (e.g. sum  = result)
}

// OR

while let result = try await group.next() {
  // some accumulation logic (e.g. sum  = result)
}

TaskGroup同样遵守 AsyncSequence protocol,能够在for await循环中迭代子任务的结果:

代码语言:swift复制
for await result in group { // non-throwing TaskGroup
  // some accumulation logic (e.g. sum  = result)
}

// OR 

for try await result in group { // ThrowingTaskGroup
  // some accumulation logic (e.g. sum  = result)
}

使用此模式,如果任务组中单个任务抛出错误,错误会传到body函数和任务组本身之外。

为了处理任务中的错误,可以使用 do-catch 或者nextResult()方法。例如,您想要实现某个函数,该函数开启了 n 个任务,并报告前 m 个成功结果。这很容易使用任务组实现,通过不断收集结果,直到results数组有 m 个结果,那时我们可以取消任务组中剩余的任务并返回:

代码语言:swift复制
func gather(first m: Int, of work: [Work]) async throws -> [WorkResult] { 
  assert(m <= work.count) 
  
  return withTaskGroup(of: WorkResult.self) { group in 
    for w in work { 
      group.addTask { await w.doIt() } // create child tasks to perform the work
    }  
    
    var results: [WorkResult] = []
    while results.count <= m { 
      switch try await group.nextResult() { 
      case nil:             return results
      case .success(let r): results.append(r)
      case .failure(let e): print("Ignore error: (e)")
      }
    }
  }
}
任务组取消操作

任务组取消有几种方法。在所有取消场景中,任务组中所有任务都是取消的,并且组中新创建的任务都是取消状态。这里有3种方法可以取消任务组:

  1. withTaskGroupbody抛出错误时
  2. 当在任务组中创建的任务是取消状态
  3. 当调用了cancelAll()操作

可以通过isCancelled属性来查询任务组取消状态。

代码语言:swift复制
extension TaskGroup {
  /// Cancel all the remaining tasks in the group.
  ///
  /// A cancelled group will not will NOT accept new tasks being added into it.
  ///
  /// Any results, including errors thrown by tasks affected by this
  /// cancellation, are silently discarded.
  ///
  /// This function may be called even from within child (or any other) tasks,
  /// and will reliably cause the group to become cancelled.
  ///
  /// - SeeAlso: `Task.isCancelled`
  /// - SeeAlso: `TaskGroup.isCancelled`
  func cancelAll() { ... }

  /// Returns `true` if the group was cancelled, e.g. by `cancelAll`.
  ///
  /// If the task currently running this group was cancelled, the group will
  /// also be implicitly cancelled, which will be reflected in the return
  /// value of this function as well.
  ///
  /// - Returns: `true` if the group (or its parent task) was cancelled,
  ///            `false` otherwise.
  var isCancelled: Bool { get }
}

例如:

代码语言:swift复制
func chopVegetables() async throws -> [Vegetable] {
  var veggies: [Vegetable] = []

  try await withThrowingTaskGroup(of: Vegetable.self) { group in
    print(group.isCancelled) // prints false

    group.addTask {
      group.cancelAll() // Cancel all work in the group
      throw UnfortunateAccidentWithKnifeError()
    }
    group.addTask {
      return try await chop(Onion())
    }

    do {
      while let veggie = try await group.next() {
        veggies.append(veggie)
      }
    } catch {
      print(group.isCancelled) // prints true now
      let added = group.addTaskUnlessCancelled {
        try await chop(SweetPotato())
      }
      print(added) // prints false, no child was added to the cancelled group
    }
  }
  
  return veggies
}

源代码兼容性

此更改纯粹是对源语言的补充。不会影响兼容性。

对 ABI 稳定性影响

此更改纯粹是对 ABI 的补充。不会影响 ABI 稳定性。

对 API 扩展性影响

本文档中描述的所有更改都是对语言的补充,并且都是局部范围的,例如,在函数体中。因此,对 API 扩展性没有影响。

0 人点赞