[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

2021-11-02 16:42:35 浏览数 (1)

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

0x00 摘要

前文我们提到了 autograd 引擎的静态架构,本文开始我们从动态角度看看引擎是如何运作的。

0x01 前文回顾

前文我们从静态角度来分析了引擎,最后得出一个动态图如下,本文会对这个图进行进一步细化。

  • 1)主线程往CPU ReadyQueue放入一个 NodeTask。
  • 2)工作线程 1 从 CPU ReadyQueue 取出 NodeTask,开始执行。
  • 3)工作线程 1 结束之后,往 device_ready_queues_ 的 某一个 ReadyQueue 插入一个 NodeTask。
  • 4)ReadyQueue 对应的 工作线程 2 取出 NodeTask,开始执行。
代码语言:javascript复制
                             ------------------------- 
                            | GraphTask               |
                            |                         |
                            |        cpu_ready_queue_ |
                            |                         |
                            |            |            |
                             ------------------------- 
                                         |
 --------------                          |                            ----------------- 
| Main Thread  |                         v                           | Worker Thread 1 |
|              |       1          ------- ---------        2         |                 |
|              | push(NodeTask)  |                 |  pop(NodeTask)  |                 |
|           -------------------> | CPU ReadyQueue   ----------------------->           |
|              |                 |                 |                 |                 |
|              |                  -----------------                  |                 |
|              |               ----------------------                |                 |
|              |              | Device ReadyQueues   |               |                 |
|              |              |                      |               |                 |
|              |              |                      |    3          |                 |
|              |              |     -------------    | push(NodeTask)|                 |
|              |              |    | ReadyQueue 1| <-----------------------            |
|              |              |     ------ ------    |               |                 |
|              |              |           |          |               |                 |
 --------------               |           |          |                ----------------- 
                              |            ------------------ 
                              |                      |       |        ----------------- 
 ------------------------     |          .           |       |       | Worker Thread 2 |
| Engine                 |    |          .           |       |       |                 |
|                        |    |          .           |       |       |                 |
|                        |    |                      |       |       |                 |
|  device_ready_queues_  ---> |     -------------    |        ------------->           |
|                        |    |    | ReadyQueue 2|   | pop(NodeTask) |                 |
|                        |    |     -------------    |     4         |                 |
 ------------------------     |                      |               |                 |
                              |                      |               |                 |
                              |     -------------    |               |                 |
                              |    | ReadyQueue 3|   |               |                 |
                              |     -------------    |               |                 |
                              |                      |               |                 |
                               ----------------------                 ----------------- 

0x02 引擎总体架构

我们从动态运行的角度看看引擎的总体架构。Engine::execute 其总体逻辑如下:

  • 启动引擎。
    • 初始化local ready_queue。
    • 构建一个GraphTask。
    • 构建GraphRoot,就是根节点。
    • 计算最小拓扑数。
    • 计算每个节点的依赖,目的是计算出所有的节点的依赖个数。
    • 如果输出不为空,则调用 graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr) 将graph_task初始化。
    • 配置工作线程的各种输入。
    • 启动工作线程。
  • 运行引擎,即使用 execute_with_graph_task(graph_task, graph_root, ...) 启动工作线程。
    • 每个线程对应一个ReadyQueue,把 Root 节点放入queue。
    • 初始化完成之后,子线程调用thread_main(nullptr)开始工作。
    • 在thread_main中反复调用evaluate_function(task)计算每个Node的梯度,通过 next_edges 的不断查找下一个Edge,直到所有节点的梯度都计算完成,最终完成了整个图的计算。
      • 进行 NodeTask 的计算。
      • 遍历 当前 Function 的 所有 next_function, 将它们的 dependencies 减一, 看看他们是否已经 ready。
      • 如果 ready, 通过 InputBuffer 的 device 来确定 将其 放入到 那个 ReadyQueue 中。
      • 如果没有准备好, 就放在 GraphTask 中的 not_ready 中。
      • 如果 graph_task->outstanding_tasks <= 0 则退出循环。即执行完了 GraphTask 所有的 Node。
  • 主进程进行阻塞等待,等待 graph_task->future_result_,即工作线程结束。

具体代码如下:

代码语言:javascript复制
auto Engine::execute(const edge_list& roots, // 反向传播的根节点
                     const variable_list& inputs, // 根节点的梯度
                     bool keep_graph, // 计算图是否需要保留
                     bool create_graph, // 是否需要构建微分图以进行高阶求导
                     bool accumulate_grad,
                     const edge_list& outputs // 需要输出梯度的节点
                    ) -> variable_list {
  validate_outputs(roots, const_cast<variable_list&>(inputs), [](const std::string& msg) {
    return msg;
  });

  // A fresh first time Engine::execute call should start on the CPU device, initialize
  // a new thread local ready queue on CPU or reuse the existing one (if there is one
  // allocated already, i.e. consecutive backward calls, re-entrant backward calls),
  // then memoize the local_ready_queue in GraphTask
  init_local_ready_queue(); // 初始化local ready_queue
  bool not_reentrant_backward_call = worker_device == NO_DEVICE;
	// 构建一个GraphTask
  auto graph_task = std::make_shared<GraphTask>(
      /* keep_graph */ keep_graph,
      /* create_graph */ create_graph,
      /* depth */ not_reentrant_backward_call ? 0 : total_depth   1,
      /* cpu_ready_queue */ local_ready_queue);

  // 构建GraphRoot
  // If we receive a single root, skip creating extra root node
  bool skip_dummy_node = roots.size() == 1;
  auto graph_root = skip_dummy_node ?
    roots.at(0).function :
    std::make_shared<GraphRoot>(roots, inputs);

  // 计算最小拓扑数
  auto min_topo_nr = compute_min_topological_nr(outputs);
  // Now compute the dependencies for all executable functions
  // 计算依赖
  compute_dependencies(graph_root.get(), *graph_task, min_topo_nr);

  // 如果输出不为空,则调用 *graph_root, outputs 将graph_task初始化
  if (!outputs.empty()) {
    graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr);
  }

  // Queue the root
  if (skip_dummy_node) {
    // 配置工作进程的各种输入
    InputBuffer input_buffer(roots.at(0).function->num_inputs());
    auto input = inputs.at(0);

    const auto input_stream = InputMetadata(input).stream();
    const auto opt_next_stream = roots.at(0).function->stream(c10::DeviceType::CUDA);
    input_buffer.add(roots.at(0).input_nr,
                      std::move(input),
                      input_stream,
                      opt_next_stream);
    // 启动工作进程
    execute_with_graph_task(graph_task, graph_root, std::move(input_buffer));
  } else {
    // 启动工作进程
    execute_with_graph_task(graph_task, graph_root, InputBuffer(variable_list()));
  }
  // Avoid a refcount bump for the Future, since we check for refcount in
  // DistEngine (see TORCH_INTERNAL_ASSERT(futureGrads.use_count() == 1)
  // in dist_engine.cpp).
  // 主进程进行阻塞等待,等待 graph_task->future_result_。
  auto& fut = graph_task->future_result_;
  fut->wait();
  return fut->value().toTensorVector();
}

我们接下来按照步骤来分析。

0x03 启动引擎

启动引擎部分包括:

  • 初始化local ready_queue。
  • 构建一个GraphTask。
  • 构建GraphRoot,就是根节点。
  • 计算最小拓扑数。
  • 计算每个节点的依赖。
  • 如果输出不为空,则调用 graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr) 将graph_task初始化。
  • 配置工作线程的各种输入。

我们接下来一一分析。

3.1 初始化local ready queue

前文提到,每个autogard 工作线程都与一个就绪队列相关联,该队列指定该线程要执行的工作流,这个队列定义如下。

代码语言:javascript复制
static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;

这个shared_ptr是一个thread_local指针,其指向每个线程的ready_queue,在执行之前,应该通过每个对应线程中的 Engine::init_local_ready_queue() 调用对其进行初始化。

另外,GraphTask 也有一个 CPU queue 成员变量 cpu_ready_queue_,专用于处理反向传播相关CPU工作。

init_local_ready_queue 代码有两个执行路径:

  • 主线程执行路径 :参数 ready_queue 没有配置,则此时 Engine::execute 是全新调用,则应该在CPU设备上启动。所以需要在CPU上初始化一个新的线程本地就绪队列或重用现有的线程本地就绪队列(比如可重入的后向传播),然后在工作线程的 local_ready_queue 之上保存。这就通过如下代码完成,此时 local_ready_queue 是主线程的线程本地变量
  • 工作线程执行路径 :参数 ready_queue 有配置,是通过 std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true),这时候 local_ready_queue 就是工作线程的本地变量。

我们这个阶段介绍的就是主线程执行路径,init_local_ready_queue 调用没有参数,生成工作线程的 local_ready_queue。

代码语言:javascript复制
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
  if (ready_queue) {
    // 工作线程执行路径
    // if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
    local_ready_queue = std::move(ready_queue);
  } else if (!local_ready_queue){
    // 主线程执行路径。
    // otherwise if local_ready_queue not allocated, allocate a new ready_queue
    local_ready_queue = std::make_shared<ReadyQueue>();
  }
}

3.2 构建GraphTask

接下来就构建了 GraphTask,这时候就把主线程的 local_ready_queue 传入。

代码语言:javascript复制
	// 构建一个GraphTask
  auto graph_task = std::make_shared<GraphTask>(
      /* keep_graph */ keep_graph,
      /* create_graph */ create_graph,
      /* depth */ not_reentrant_backward_call ? 0 : total_depth   1,
      /* cpu_ready_queue */ local_ready_queue);

在构建函数内部,就把 local_ready_queue 赋值给了 GraphTask 的成员变量 cpu_ready_queue_。

代码语言:javascript复制
GraphTask(
    bool keep_graph,
    bool grad_mode,
    int reentrant_depth,
    std::shared_ptr<ReadyQueue> cpu_ready_queue,
    bool exit_on_error = false)
    : keep_graph_(keep_graph),
      grad_mode_(grad_mode),
      owner_(NO_DEVICE),
      reentrant_depth_(reentrant_depth),
      exit_on_error_(exit_on_error),
      cpu_ready_queue_(std::move(cpu_ready_queue)),
      future_result_(std::make_shared<at::ivalue::Future>(c10::ListType::create(c10::TensorType::get()))) {}

目前逻辑如下,生成了 GraphTask 内部的 queue,但是引擎内部的 device_ready_queues_ 还没有生成

代码语言:javascript复制
 ------------------------                                       ----------------------- 
| GraphTask              |                                     | Main Thread           |
|                        |       |-----------------|           |                       |
|     cpu_ready_queue_ ------->  | CPU ReadyQueue  | <-----------  local_ready_queue   |
|                        |        -----------------            |                       |
|                        |                                     |                       |
 ------------------------                                       ----------------------- 


 ------------------------ 
| Engine                 |
|                        |
|                        |
|  device_ready_queues_  |
|                        |
|                        |
 ------------------------ 

3.3 构建根节点

接下来构建根节点。关于构建根节点我们有一个问题:如果在前向计算中,有多个输出怎么办?就是后向传播时候有多个输入根,这时候怎么办?答案如下:

  • 如果只有一个root节点,就跳过创建其他根,直接返回 roots.at(0).function 作为 GraphRoot,就是一个Node节点。
  • 如果有多个root输入根,就构造一个GraphRoot,用它来驱动后向传播。
    • 把这些 root 作为参数构建一个GraphRoot,这个 GraphRoot 作为真正的根节点。
    • root 就是 Node 的边。即,把这些根对应的 edge_list 转换为 Node 里面的 next_edges_,这个GraphRoot可以认为是一个虚拟Root

具体代码如下:

代码语言:javascript复制
  // If we receive a single root, skip creating extra root node
  bool skip_dummy_node = roots.size() == 1;
  auto graph_root = skip_dummy_node ?
    roots.at(0).function : // 单个root,直接使用
    std::make_shared<GraphRoot>(roots, inputs); // 多个root输入根,就构造一个GraphRoot,用它来驱动后向传播

回忆一下 GraphRoot 的定义,大家可以印证一下。

代码语言:javascript复制
using edge_list = std::vector<Edge>;

struct TORCH_API GraphRoot : public Node {
  GraphRoot(edge_list functions, variable_list inputs)
      : Node(std::move(functions)),
      outputs(std::move(inputs)) {
    // Ensures calls to stream() on a GraphRoot instance reflect current stream(s)
    // on devices of root grad tensors at the time the instance is constructed.
    for (const auto& t : outputs) {
      add_input_metadata(t);
    }
  }

  variable_list apply(variable_list&& inputs) override {
    return outputs; // 直接把梯度透传给后续节点
  }

  variable_list outputs; // 这个是梯度
};

3.4 计算最小拓扑

compute_min_topological_nr 的作用是:遍历出边,找到最小拓扑数 min_topo_nr。min_topo_nr 接下来会用在计算依赖函数之中。

代码语言:javascript复制
inline static uint64_t compute_min_topological_nr(const edge_list& outputs) {
  // Computes the mininum topological number among all the outputs
  if (outputs.empty()) {
    return 0;
  }
  auto min_topo_nr = std::numeric_limits<uint64_t>::max();
  for (auto & output_edge : outputs) {
    auto topo_nr = output_edge.function.get()->topological_nr();
    min_topo_nr = (min_topo_nr < topo_nr) ? min_topo_nr : topo_nr;
  }
  return min_topo_nr;
}

topological_nr_ 是 “节点”的拓扑顺序号,表示从该节点到任何叶节点的最长可能路径的长度。如果某个节点是叶节点,即AccumulateGrad,topological_nr_ 将是零。topological_nr_ 用于在autograd发现期间对DAG中的分支进行修剪,维护拓扑 topological_nr_有助于我们在两个节点之间不存在有向路径时,在O(1) 时间完成检查。

topological_nr_ 具有以下属性:

  • 对于G中的每一对节点X,Y,如果存在从X到Y的有向路径,则意味着 topo_nr(X) > topo_nr(Y)。然而,事实并非如此,因此我们无法证明从X到Y的路径的存在性,只能证明不存在。
  • 我们在使用 topological_nr_ 时所做的一个假设是:一旦使用了一个节点,即,它有一个父节点,那么它自己的topological_nr_ 就不会改变。我们在“has_parent_”字段中添加了一些检查来强制执行这一点。

具体大家也可以通过代码中的注释来印证。

代码语言:javascript复制
// NOTE [ Topological Number ]
//
// topological_nr is used to prune branches in the DAG during autograd discovery as
// maintaining topological_nr helps us check in O(1) if there does NOT exist
// a directed path between two nodes.
//
// The topological order number of this `Node` representing the length of the
// longest possible path from this Node to any leaf node. If you are leaf node,
// aka AccumulateGrad, this will be zero. This value has the property that
// For every pair of nodes X, Y in G, existence of a directed path from X to Y
// implies topo_nr(X) > topo_nr(Y). The converse is not true, however, so we
// cannot prove existence of a path from X to Y, only non-existence.
//
// One assumption we make when using topo_nr is that once a node
// has been used, i.e., has a parent node, its own topo_nr does not change
// we have added some checks with the `has_parent_` field to enforce this.
//
// What NOT to do:
//
//   1) 2 -> 1 -> 0               In this diagram we label nodes with their topo_nr.
//      2 -> 1 -> 0               We have two simple graphs that can each arise from
//                                `t.exp().exp()`, for example.
//   2)        2 -> 1 -> 0
//            /
//      2 -> 1 -> 0               We add 2 as a next edge to 1 even though 1 already
//                                has a parent.
//   3)        2 -> 1 -> 0
//            /
//      2 -> 3 -> 0               2 < 3, yet there exists a path from 2 to 3!
//
uint64_t topological_nr() const noexcept {
  has_parent_ = true;
  return topological_nr_;
}

3.5 计算依赖

GraphTask 的 dependencies_ 成员变量 用来判断后续节点是否已经可以被执行,其类型如下:

代码语言:javascript复制
std::unordered_map<Node*, int> dependencies_;

dependencies 的 key 数目就是微分图中Node数目,dependencies 计算的就是每一个Node的入度

dependencies成员在compute_dependencies调用中被初始化,只要某一个grad_fn函数在其他人的 next_edges() 中出现过一次,那么dependencies[this_grad_fn] 就自增1。如果dependencies[this_grad_fn]大于0,说明this_grad_fn有一个后向传播的依赖,即 this_grad_fn 需要等 被依赖者 完成,才能进行自己的反向传播相关计算。

compute_dependencies 就是计算GraphTask的dependencies_。其逻辑是:从 graph_root 开始,对微分图中每个node的依赖进行计算,计算从根节点开始,通过广度优先的算法进行。如果一个grad_fn函数在别人的next_edges()中出现过一次,那么dependencies[grad_fn] 就自增1。具体代码如下:

代码语言:javascript复制
auto Engine::compute_dependencies(Node* root, GraphTask& task, uint64_t min_topo_nr) -> void {
  // Computes the number of dependencies for each function which requires grad
  std::unordered_set<Node*> seen;
  std::vector<Node*> queue { root };

  // Queue contains all nodes that will start propagating gradients.
  // We no longer have to expand functions that don't require grad.
  auto& dependencies = task.dependencies_;
  while (!queue.empty()) {
    auto fn = queue.back(); queue.pop_back();
    if (fn->topological_nr() < min_topo_nr) {
      continue;
    }
    for (const auto& edge : fn->next_edges()) {
      if (auto next_ptr = edge.function.get()) { 
        dependencies[next_ptr]  = 1;
        const bool was_inserted = seen.insert(next_ptr).second;
        if (was_inserted) queue.push_back(next_ptr);
      }
    }
  }
}

比如我们的代码

代码语言:javascript复制
a = torch.tensor(2., requires_grad=True)
b = torch.tensor(6., requires_grad=True)
Q = 3*a**3 - b**2

对应计算图是

得到依赖是:

代码语言:javascript复制
dependencies[PowBackward0] = 1 # 被 MulBackward0 的 next_edges 引用
dependencies[MulBackward0] = 1 # 被 SubBackward0 引用
dependencies[PowBackward0_2] = 1 # 被 SubBackward0 引用

如果某个节点的 dependencies 为0,才能执行。

3.6 初始化GraphTask ExecInfo

如果出边不为空,则会调用 init_to_execute 对GraphTask.exec_info_进行初始化。

代码语言:javascript复制
  if (!outputs.empty()) {
    graph_task->init_to_execute(*graph_root, outputs, accumulate_grad, min_topo_nr);
  }

GraphTask.exec_info_ 的作用就是给 GraphTask 的每一个 Node 配置一个 ExecInfo,就是该 Node 的执行信息。

  • 如果exec_info_为空,说明该task运行在默认模式,即,所有遇到的 next_edges 都需要执行。
  • 如果 exec_info_ 非空,说明只有特定 functions 才会被执行,这些 Functions 的特点是:拥有 entry,并且这个 entry 的 “has needed == True”。

exec_info_ 何时为空?何时非空?

  • 当图被用 .backward() 执行,并且没有传递输入参数,则 exec_info 为空。
  • 如果只是使用用 .grad() 执行,或者使用.backward() 执行时候并且给定输入参数,那么 exec_info_ 非空

所以,exec 和 captured_vars_ 就是针对 grad() 和指定参数的 backward(),就是标注在这种情况下需要计算哪些梯度。在这种情况下,只有某些节点需要执行,从这些节点开始,有一条路径通向 outpus

init_to_execute 的作用是给 exec_info 填充数据,目的是对于应该执行的节点,设置其成员变量exec_info[node].needed_ = true。只有某些特定节点会得到执行,这些节点有一条出边,出边的另一端在“outputs”之中。

其主要算法逻辑为:使用递归思路来填充 exec_info,但是对于实际代码是使用iterative方式进行。在iterative版本中,当你操作当前节点时候,在你所有孩子节点被更新之后,你需要更新你父亲节点。

代码语言:javascript复制
void GraphTask::init_to_execute(Node& graph_root, const edge_list& outputs, bool accumulate_grad, uint64_t min_topo_nr) {
  // Populates exec_info so nodes that should be executed have `exec_info[node].needed_ = true`
  // Only nodes that have a path to any edge in `outputs` should be executed.
  // The code below populates exec_info using recursion, but the actual code does this
  // iteratively. Refer to the numbering to see how the actual code corresponds.
  // A difference to note is that in the iterative version, when you are working with
  // the current Node, you are reponsible to update your parent's is_needed after all your
  // children have been updated.
  //
  // is_needed = {fn: True for fn in outputs}             # (0)
  // seen = {}
  // def compute_is_needed(fn):
  //   for next_edge in fn.next_edges:
  //     child_fn = next_edge.fn
  //     if child_fn in seen and is_needed[child_fn]:     # (1)
  //       is_needed[fn] = true
  //     else:
  //       seen.add(child_fn)
  //       if compute_is_needed(child_fn):
  //         is_needed[fn] = true                         # (2)
  //                                                      # (3) exit for-loop
  //   return is_needed[fn]
  // compute_is_needed(graph_root)
  //
  // NB: you might be wondering why we don't populate `seen` with outputs. We cannot
  // because in the case where two outputs lie on the same path, we still need to explore past
  // the first output or we would miss the nodes that are required to compute the second output.
  int output_idx = 0;
  for (auto & output_edge : outputs) {
    // (0) `is_needed` above corresponds to `exec_info_[fn].needed_`
    Node *output = output_edge.function.get();
    auto & info = exec_info_[output];
    if (accumulate_grad) {
      // if called through `.backward()` we directly set `needed_` for all the outputs to true
      info.needed_ = true;
    } else {
      // otherwise it is `.grad()` and we set exec_info[fn].captures_ instead
      // In terms of populating the rest of exec_info though, you can basically
      // think of this as the same as setting `needed_` is true directly.
      if (!info.captures_) {
        info.captures_ = make_unique<std::vector<ExecInfo::Capture>>();
      }
      // 第 i 个输入对应的输出
      info.captures_->emplace_back(output_edge.input_nr, output_idx  ); 
    }
  }
  captured_vars_.resize(output_idx);

  struct Frame {
    Frame (Node *fn) : fn_(fn), next_next_fn_(0) {}
    Node *fn_;
    size_t next_next_fn_;

    Node* get_next_fn() {
      const auto & next = fn_->next_edges();
      auto num_next = next.size();
      while (next_next_fn_ < num_next) {
        auto fn = next[next_next_fn_  ].function.get();
        if (fn) return fn;
      }
      return nullptr;
    }
  };

  auto nodeShouldExecute = [this](Node *fn) {
    auto it = exec_info_.find(fn);
    return it != exec_info_.end() && it->second.should_execute();
  };

  std::vector<Frame> stack;
  std::unordered_set<Node*> seen;
  stack.emplace_back(&graph_root);
  exec_info_.emplace(stack.back().fn_, ExecInfo()); //有多个exec_info

  while (!stack.empty()) {
    auto &frame = stack.back();
    const auto fn = frame.fn_;

    Node *child_fn = nullptr;
    while((child_fn = frame.get_next_fn()) && !seen.emplace(child_fn).second) {
      // (1) next child exists AND has already been seen
      if (nodeShouldExecute(child_fn)) {
        exec_info_[fn].needed_ = true;
      }
    }

    if (child_fn) {
      // (2) next child exists but has not been seen
      if (child_fn->topological_nr() < min_topo_nr) {
        // child created before the first output means this child cannot have
        // an edge to output
        continue;
      }
      stack.emplace_back(child_fn);
    } else {
      // (3) no next child exists for `fn` means its `needed` has already been
      // finalized. pop stack and update parent
      stack.pop_back();
      if (nodeShouldExecute(fn) && !stack.empty()) {
        exec_info_[stack.back().fn_].needed_ = true;
      }
    }
  }
}

3.7 配置工作线程输入

接下来在主线程中,会配置工作线程的输入,就是构建了 InputMetadata。

代码语言:javascript复制
  // Queue the root
  if (skip_dummy_node) {
    // 如果是单节点,则直接使用 CUDA queue
    InputBuffer input_buffer(roots.at(0).function->num_inputs());
    auto input = inputs.at(0);
    // 构建InputMetadata
    const auto input_stream = InputMetadata(input).stream();
    const auto opt_next_stream = roots.at(0).function->stream(c10::DeviceType::CUDA);
    input_buffer.add(roots.at(0).input_nr,
                      std::move(input),
                      input_stream,
                      opt_next_stream);

    execute_with_graph_task(graph_task, graph_root, std::move(input_buffer));
  } else {
    // 如果是多输入根节点,之前构建了虚拟根节点,后续就对应了 CPU queue
    execute_with_graph_task(graph_task, graph_root, InputBuffer(variable_list()));
  }

3.8 开始运行

接下来会调用 execute_with_graph_task。execute_with_graph_task具体做就是启动后续各种线程,关于 线程 具体我们后续详解。

这里就是知道,根据当前设备来走不同路径,具体逻辑如下:

  • 如果 worker_device == NO_DEVICE,说明这是主线程,则:
    • 获取到相关queue,具体是利用 input_buffer.device() 获得的,这就用到了上节的 InputBuffer。如果获取到的设备是 CPU,就获取 GraphTask.cpu_ready_queue_,如果是 GPU,就用到了对应的GPU设备对应的 queue,具体我们后文详述。
    • 在 queue 之中插入 NodeTask。
    • 调用 thread_main 运行 GraphTask。
  • 否则是可重入反向传播情况下的主线程,则:
    • 利用 graph_task->owner_ = worker_device 指定当前设备是哪个设备,GPU 或者 CPU。
      • 如果已经达到了最大递归深度,则采用add_thread_pool_task 启动 GPU线程 或者 CPU线程。
      • 否则运行 thread_main。

具体代码如下:

代码语言:javascript复制
std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
    const std::shared_ptr<GraphTask>& graph_task,
    std::shared_ptr<Node> graph_root,
    InputBuffer&& input_buffer) {
  
  initialize_device_threads_pool(); // 启动设备工作线程
  
  // Lock mutex for GraphTask.
  std::unique_lock<std::mutex> lock(graph_task->mutex_);

  // 获取到相关queue,具体是利用 input_buffer.device() 获得的。
  auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());

  // worker_device == NO_DEVICE it's a CPU thread and it's trying to drive the
  // autograd engine with corresponding GraphTask, and its NOT a re-entrant call
  if (worker_device == NO_DEVICE) {
    // 如果到了这里,必然是没有活跃工作设备  
    
    // 主线程
      
    // We set the worker_device to CPU_DEVICE only if worker_device was previously
    // NO_DEVICE. Setting it to CPU afterwards allow us to detect whether this is
    // a re-entrant call or not.
    set_device(CPU_DEVICE);

    // set the graph_task owner to the current device
    graph_task->owner_ = worker_device; // 就是 CPU 设备

    // Now that all the non-thread safe fields of the graph_task have been populated,
    // we can enqueue it.
    // 给 queue 之上插入 NodeTask,这样就会唤醒对应线程开始工作
    queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));

    // The owning thread start to drive the engine execution for any CPU task that
    // was just pushed or will be added later from other worker threads
    lock.unlock();
    thread_main(graph_task); // 在主线程运行,这里会在queue之上阻塞
    
    TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
    // reset the worker_device after the completion of the graph_task, this is so
    // that the initial state of the engine remains the same across every backward()
    // or grad() call, we don't need to reset local_ready_queue as we could possibly
    // reuse it for new backward calls.
    worker_device = NO_DEVICE;
    
    // 如果到了这里,必然是没有活跃工作设备,就是所有 GraphTask都结束了,如果没有结束,就是reentrant,必须走下面的case
    
    // 主线程  
  } else {
      
    // 重入后向传播状况下的主线程
      
    // If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
    //    backward call from that device.
    graph_task->owner_ = worker_device;

    // Now that all the non-thread safe fields of the graph_task have been populated,
    // we can enqueue it.
    // 向 queue 插入第一个NodeTrask,就是 graph_root
    queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));

    if (current_depth >= max_recursion_depth_) {
      // See Note [Reentrant backwards]
      // If reached the max depth, switch to a different thread
      // 达到最大重入深度,这里会启动一个新的线程  
      add_thread_pool_task(graph_task); // 启动GPU或者CPU线程
    } else {
      // Total depth needs to be updated only in this codepath, since it is
      // not used in the block above (when we call add_thread_pool_task).
      // In the codepath above, GraphTask.reentrant_depth_ is used to
      // bootstrap total_depth in the other thread.
        total_depth;

      // Get back to work while we wait for our new graph_task to
      // complete!
        current_depth;
      lock.unlock();
      thread_main(graph_task); // 在主线程运行,这里会在queue之上阻塞
      --current_depth;
      --total_depth;

      // The graph task should have completed and the associated future should
      // be marked completed as well since 'thread_main' above is a call
      // blocking an autograd engine thread.
      TORCH_INTERNAL_ASSERT(graph_task->future_result_->completed());
    }
      
     // 重入后向传播状况下的主线程  
  }
  // graph_task_exec_post_processing is done when the Future is marked as
  // completed in mark_as_completed_and_run_post_processing.
  return graph_task->future_result_;
}

3.9 配置设备和ReadyQueue

上节代码之中,有如下代码进行配置设备。

代码语言:javascript复制
set_device(CPU_DEVICE);
3.9.1 CPU_DEVICE

可以看到,在 set_device 时候,如果不是 CPU_DEVICE,就设置 impl->setDevice

代码语言:javascript复制
void set_device(int device) {
  // NB: We MUST NOT construct the guard for device CPU,
  // as in some settings we compile with cuda, but
  // have lazy stubs for CUDA functionality (so actually
  // attempting to setup a guard(CPU_DEVICE) will cause an
  // error, because it will still query cudaGetDevice).
  //
  // Don't use DeviceGuard here because its destructor may be called before the
  // device is reset. This is fine because the device is thread local.
  if (device != CPU_DEVICE) {
    for (size_t i = 0; i < static_cast<size_t>(c10::DeviceType::COMPILE_TIME_MAX_DEVICE_TYPES); i  ) {
      auto* impl = c10::impl::device_guard_impl_registry[i].load();
      if (impl && device < impl->deviceCount()) {
        impl->setDevice(at::Device(static_cast<c10::DeviceType>(i), device));
      }
    }
  }
  worker_device = device;
}

除了初始化时候调用 set_device(CPU_DEVICE),在Engine::thread_init也会调用,就是启动设备线程时候用到的,设置了设备序列号这个序列号就可以和 ReadyQueue 对应

代码语言:javascript复制
auto Engine::start_device_threads() -> void {
  for (int i = 0; i < num_devices;   i) {
    std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true);
    t.detach();
  }
}

void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) {
  ...
	set_device(device);
  ...
}
3.9.2 Ready Queue

上节代码之中,有如下代码获取 queue。

代码语言:javascript复制
// 获取到cpu相关queue,具体是利用 input_buffer.device() 获得的。
auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());

我们具体看看是如何获取当前queue的,这是根据本GraphTask的 CPU queue 和 配置的输入device 一起计算得出的,即:

  • 调用 InputBuffer::device() 从输入获取设定的设备,如果设定了,就使用这个设备,否则使用 CPU。
  • 如果是CPU,就使用 cpu_ready_queue,否则使用 device_ready_queues_。

进一步解析如下:

每个GraphTask都有自己的CPU queue,但是 GPU Queues 被所有GraphTask共享。

代码语言:javascript复制
// CPU ready queue is per GraphTask, but CUDA device ready queues are shared across all graph tasks
auto Engine::ready_queue(std::shared_ptr<ReadyQueue> cpu_ready_queue, at::Device device) -> std::shared_ptr<ReadyQueue>{
  if (device.type() == at::kCPU || device.type() == at::DeviceType::Meta) {
    // return the cpu ready queue passed in
    return cpu_ready_queue;
  } else {
    // See Note [Allocating GPUs to autograd threads]
    return device_ready_queues_.at(device.index());
  }
}

InputBuffer::device 是从输入参数中获取设备。这里如果有配置设备,就返回,否则返回 at::kCPU。

代码语言:javascript复制
auto InputBuffer::device() const -> at::Device {
  // Since we pick the first non-CPU tensor, this won't work with
  // mixed device-type operations (e.g., an op that is both CUDA
  // and XLA).  This is *incredibly* unlikely, so we don't worry
  // about it.
  for (auto& var : buffer) {
    if (var.defined()) {
      auto device = var.device();
      if (device.type() != at::kCPU) {
        return device;
      }
    }
  }
  // Only report to the CPU thread if there really were no tensors
  // from other devices.
  return at::kCPU;
}

3.10 主进程等待

在 Engine::execute 最后,通过如下代码,主进程进入了等待状态

代码语言:javascript复制
  // Avoid a refcount bump for the Future, since we check for refcount in
  // DistEngine (see TORCH_INTERNAL_ASSERT(futureGrads.use_count() == 1)
  // in dist_engine.cpp).
  // 主进程进行阻塞等待,等待 graph_task->future_result_。
  auto& fut = graph_task->future_result_;
  fut->wait();
  return fut->value().toTensorVector();

这里使用 std::shared_ptr<at::ivalue::Future> future_result_; 来进行线程间通信。

主线程用wait等待,工作线程用markCompleted来通知主进程结束。

0x04 启动线程

因为线程部分比较复杂,所以我们从启动部分提出来单独分析。

因为大型模型往往梯度数目太多,所以PyTorch 采用了多线程处理。为了应对各种情况,PyTorch 定义了一个线程变量 worker_device。引擎生成的线程都被分配一个 "worker_device",指定它们为哪个设备处理工作。此变量在以下位置初始化:

  • 在CUDA,XLA设备线程的创建时间初始化,因为他们正等待在自己的设备上工作。
  • 在CPU线程的图任务执行之前初始化,因为对于每个后向调用,我们使用调用线程(caller thread)来驱动引擎执行。
代码语言:javascript复制
static constexpr int NO_DEVICE = -2;
static constexpr int CPU_DEVICE = -1;

// Threads spawned by the engine are assigned a 'worker_device' specifying
// what device they process work for. This variable is initialized at:
// 1. thread creation time for CUDA, XLA device threads, as they are
//    spinning threads waiting for works on their device.
// 2. before the graph task execution for CPU threads, as for each
//    backward call we use the caller thread to drive engine execution.
// This is used when handling reentrant backwards calls;
// See Note [Reentrant backwards]
static thread_local int worker_device = NO_DEVICE;

4.1 启动设备线程

4.1.1 调用

execute_with_graph_task 方法之中,使用 initialize_device_threads_pool 启动 start_device_threads。

代码语言:javascript复制
std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
    const std::shared_ptr<GraphTask>& graph_task,
    std::shared_ptr<Node> graph_root,
    InputBuffer&& input_buffer) {
  
  // 这里首先会启动工作线程
  initialize_device_threads_pool();

这里使用std::call_once来确保在整个进程周期之内,start_device_threads成员函数只被调用了一次,即设备线程只生成一次。

代码语言:javascript复制
void Engine::initialize_device_threads_pool() {
  track_bad_autograd_forks();
  std::call_once(start_device_threads_flag_, &Engine::start_device_threads, this);
}
4.1.2 线程数目

在引擎之中,工作线程的数目是依据设备数量决定的。如果有n个设备,就会启动n个设备工作线程。比如,如果有2个GPU,则启动2个设备工作线程。但是每一个GraphTask都有自己的CPU工作线程(我们接下来马上介绍)。

GPU工作线程对应的 ReadyTask 是 Engine 之中的 成员变量。

代码语言:javascript复制
std::vector<std::shared_ptr<ReadyQueue>> device_ready_queues_;

此时两个GPU工作线程对应的 ready queue index 分别是 0, 1。

device_ready_queues_ 定义在引擎之中,也说明 devices queues 是在所有 GraphTask 之间共享,而CPU queue是每个 GraphTask 独有。设备线程的启动是在 start_device_threads 函数,可以看到其调用了 std::thread 启动了线程,并且用 detach 让其独立运行。

4.1.3 启动设备线程

start_device_threads 用来启动设备线程,设备线程数目与设备数目相关,这样 NUM_DEVICES 个线程在后台一起处理 GraphTask 中的任务。

  • 使用deviceCount得到设备数量 num_devices。
  • 然后根据设备的数量来决定要启动的设备线程数量。
  • 创建多个ReadyQueue,ReadyQueue数目和工作线程数目一样。这些ReadyQueue 在 Engine 的 device_ready_queues_ 之上被管理。device_ready_queues_ 就是管理GPU。
  • 创建设备线程。每个线程都是std::thread,构建时候,把对应的ReadyQueue,就是device_ready_queues_[i] 和 Engine(整个进程生命周期只有一个Engine实例)都传递进去。这样Queue可以依靠Engine对于device_ready_queues_的共享来实现线程间工作对象传输。
  • 作为对比,GraphTask 专门有一个ReadyQueue(cpu_ready_queue_)是用来跑CPU相关工作线程。因为CPU工作线程专门用来处理反向传播的CPU工作。当 GraphTask 在某一个GPU之上的工作结束了,下一个 NodeTask 应该切换到 CPU 之上,所以GraphTask需要记住自己的cpu_ready_queue_ ,从而给 cpu_ready_queue_ 发送消息。 注意,cpu_ready_queue_ 这是 GraphTask 专有的 ReadyQueue。
代码语言:javascript复制
auto Engine::start_device_threads() -> void {
  // See Note [Allocating GPUs to autograd threads]
  // 使用deviceCount得到 设备数量 num_devices。  
  c10::DeviceIndex num_devices = 0;
  for (const auto& impl_atomic : c10::impl::device_guard_impl_registry) {
    auto* impl = impl_atomic.load();
    if (impl) {
      num_devices = std::max(num_devices, impl->deviceCount());
    }
  }

  // allocate one thread for every GPU device (but colocate GPUs of different
  // types), and pre-allocate the device_ready_queues_ to ensure safe reading on it.
  // 创建多个ReadyQueue,ReadyQueue数目和工作线程数目一样
  device_ready_queues_ = std::vector<std::shared_ptr<ReadyQueue>>(num_devices);
  for (auto& queue : device_ready_queues_)    {
    queue.reset(new ReadyQueue());
  }

  thread_pool_shared_ = std::make_shared<ThreadPoolShared>();
  // 创建设备线程
  for (int i = 0; i < num_devices;   i) {
    std::thread t(&Engine::thread_init, this, i, device_ready_queues_[i], true);
    t.detach(); // 让工作线程独立运行
  }
  // Wait for the threads to start
  {
    std::unique_lock<std::mutex> lk(non_reentrant_device_thread_mutex_);
    while(non_reentrant_device_thread_count_.load() != static_cast<uint32_t>(num_devices)) {
      non_reentrant_device_thread_condvar_.wait(lk);
    }
  }
}

设备线程启动之后,都使用 wait 阻塞在自己对应的 ReadyQueue 之中,主线程或者其他worker线程通过对 某一个设备线程的ReadyQueue 采用 push(NodeTask) 操作来唤醒该设备线程进行工作

4.2 线程初始化

线程会调用 thread_init 进行初始化,这里会:

  • 配置线程的设备。
  • 调用 init_local_ready_queue 来初始化局部queue。
  • 调用 thread_main 作为线程主体函数来执行。

初始化本地queue的代码如下:

代码语言:javascript复制
void Engine::init_local_ready_queue(std::shared_ptr<ReadyQueue> ready_queue) {
  if (ready_queue) {
    // if ready_queue provided in the caller, use the caller's ready_queue to initialize local_ready_queue
    local_ready_queue = std::move(ready_queue);
  } else if (!local_ready_queue){
    // otherwise if local_ready_queue not allocated, allocate a new ready_queue
    local_ready_queue = std::make_shared<ReadyQueue>();
  }
}

每个autograd工作线程都会与一个ready queue关联,这个queue就是这个线程的工作流。local_ready_queue 使用 std::shared_ptr 来作为 本地线程指针。

代码语言:javascript复制
// Every autograd worker thread is associated with a ready queue, which specifies
// the stream of work of this thread to do. This shared_ptr is a thread_local
// pointer to each thread's ready_queue, and it should be initialized via the
// Engine::init_local_ready_queue() call in each corresponding thread before execution.
//
// The CUDA, XLA threads are shared among all invocations of backwards via
// device_ready_queues_, while CPU threads are dedicated to processing CPU work for
// the backward they invoked. So any given graph task maintains its own cpu_ready_queue_
// where you should send work for it to be done
//
// For reentrant backward calls, if we spawn new thread from the current thread
// because we reached the maximum depth, the new thread will just reuse the same
// ReadyQueue with the parent thread for performance improvement.
// see Note [Reentrant backwards] for more details.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static thread_local std::shared_ptr<ReadyQueue> local_ready_queue = nullptr;

具体初始化的代码如下:

代码语言:javascript复制
void Engine::thread_init(int device, const std::shared_ptr<ReadyQueue>& ready_queue, bool should_increment) {
  if (should_increment) {
    increment_non_reentrant_thread_count();
  }

  at::init_num_threads();

  // Note [Allocating GPUs to autograd threads]
  // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  // What's our strategy here?  Originally, the autograd engine was written
  // with only CUDA in mind.  We allocate one thread to handle all CPU
  // operations, and a thread per CUDA device.
  //
  // But what if we have OTHER devices?  There are two plausible
  // strategies:
  //
  //  - We can allocate threads equal to max(num_cuda_devices, num_xla_devices,
  //    ...) and colocate cuda device 0 with xla device 0
  //  - We can allocate threads equal to sum(num_cuda_devices, num_xla_devices,
  //    ...) keeping everyone separate.
  //
  // We don't have any good reason to prefer one or the other, so we've
  // arbitrarily picked to colocate devices.  Maybe the other approach is
  // better.
  set_device(device);

  // initialize each device thread's thread local ready queue with the ready queue
  // that is created before the thread initialization
  init_local_ready_queue(ready_queue);

  std::shared_ptr<GraphTask> graph_task = nullptr;
  thread_main(graph_task);
  if (should_increment) {
    // Decrement the count during shutdown if we incremented earlier.
    decrement_non_reentrant_thread_count();
  }
}

目前逻辑如下,生成了一系列工作线程,也生成了device_ready_queues_:

代码语言:javascript复制
 ------------------------                                       ----------------------- 
| GraphTask              |                                     | Main Thread           |
|                        |       |-----------------|           |                       |
|     cpu_ready_queue_ ------->  | CPU ReadyQueue  | <-----------  local_ready_queue   |
|                        |        -----------------            |                       |
|                        |                                     |                       |
 ------------------------                                       ----------------------- 




 ------------------------ 
| Engine                 |     ----------------------           ----------------------- 
|                        |    | Device ReadyQueues   |         | Worker Thread 1       |
|                        |    |                      |         |                       |
|  device_ready_queues_  ---> |                      |         |                       |
|                        |    |     -------------    |         |                       |
|                        |    |    | ReadyQueue 1| <-------------  local_ready_queue   |
 ------------------------     |     -------------    |         |                       |
                              |                      |         |                       |
                              |                      |         |                       |
                              |                      |          ----------------------- 
                              |                      |
                              |          .           |
                              |          .           |
                              |          .           |          ----------------------- 
                              |                      |         | Worker Thread 2       |
                              |     -------------    |         |                       |
                              |    | ReadyQueue 2| <-------------  local_ready_queue   |
                              |     -------------    |         |                       |
                              |                      |         |                       |
                              |                      |          ----------------------- 
                              |     -------------    |
                              |    | ReadyQueue 3|   |
                              |     -------------    |
                              |                      |
                               ---------------------- 

然后会调用 thread_main 进行线程主体,我们下文会分析。

4.3 可重入反向传播

4.3.1 示例

从PyTorch 的测试代码之中可以看到,在反向传播之中也会调用反向传播。

在这种情况下,Engine 之内会存在多个 GraphTask。

代码语言:javascript复制
       class MyFunction(Function):
            @staticmethod
            def forward(ctx, x):
                return x

            @staticmethod
            def backward(ctx, x):
                order.append("MyFunction")
                return x

        class Reentrant(Function):
            @staticmethod
            def forward(ctx, x):
                with torch.enable_grad():
                    ctx.x = Variable(x.detach(), requires_grad=True)
                    ctx.x = ctx.x - 1
                return ctx.x.detach()

            @staticmethod
            def backward(ctx, x):
                order.append("Reentrant")
                if ctx.x < 0:
                    return x
                with torch.enable_grad():
                    Reentrant.apply(ctx.x).backward()
                return x

        a = MyFunction.apply(torch.tensor(6.0, requires_grad=True))
        b = Reentrant.apply(torch.tensor(9.0, requires_grad=True))
        v = a * b
        v.backward()
4.3.2 设计理念

以下是 PyTorch 的设计理念

为了理解可重入向后问题,我们必须注意autograd引擎目前的实现方式的两个方面:

    1. 当您调用 Engine::execute() 时,您希望阻塞直到微分完成,以便可以获得向后传递的最终结果变量。
    1. 引擎运行时,每个工作队列之上有一个工作线程来运行,每个工作队列固定到执行操作的特定设备上。

问题是,假设您在工作线程内部调用 backward()。

根据属性 (1),我们应该阻塞,直到嵌套任务完成。但是,根据属性(2),这个工作线程负责处理分配给它的任务;我们最好不要被阻塞。因为那种情况下,我们所有的反向执行(包括我们刚刚开始的那一次)都会死锁!

所以,我们维护一个等待分配工作的线程池。

当发生可重入向后调用时,当前线程将被阻塞,池中的一个线程将被唤醒,以完成阻塞任务和分配给该辅助线程的任何其他任务。如果没有可用的线程,将生成一个新线程。新线程将继续处理与父工作线程同一个ReadyQueue中的任务。当目前GraphTask完成后,将通知正在等待任务的父工作线程,并且当前线程将被返回给线程池。

4.3.3 实现

当发现是可重入后向传播时,而且超出最大递归深度,Engine::execute_with_graph_task 会调用如下代码向线程池加入一个新线程。

代码语言:javascript复制
    if (current_depth >= max_recursion_depth_) {
      // See Note [Reentrant backwards]
      // If reached the max depth, switch to a different thread
      add_thread_pool_task(graph_task);
    }

相关数据结构如下:

代码语言:javascript复制
  struct ThreadPoolShared {
    // Data structures used by the threads for executing reentrant backwards
    // tasks. See Note [Reentrant backwards]
    // Number of available threads for processing new GraphTasks.
    unsigned int num_workers_;
    // The threads will wait on work_ to be notified of GraphTasks
    std::condition_variable work_;
    // To protect reads and writes to graphtask_queue_ and num_workers_
    // and for synchronizing creating new threads when needed
    std::mutex mutex_;
    // Workers will process the GraphTasks added to this queue. A GraphTask is
    // allocated inside Engine::execute and lives for the duration of execute
    std::queue<std::weak_ptr<GraphTask>> graphtasks_queue_;

    ThreadPoolShared() : num_workers_(0) {}
 };

add_thread_pool_task 代码如下。

  • 这里判断是否 graphtask 队列达到最大值,如果没有达到,则建立一个新线程。
  • 把 graph_task 放入队列 graphtasks_queue_。
  • 新线程的执行函数是 reentrant_thread_init,其会等待在 thread_pool_shared_->work_ 之上。
  • 这里会 thread_pool_shared_->work_.notify_one() 让新线程运行。
代码语言:javascript复制
void Engine::add_thread_pool_task(const std::weak_ptr<GraphTask>& graph_task) {
  std::unique_lock<std::mutex> lck(thread_pool_shared_->mutex_);
  // There may already be some items on the graphtasks_queue_ added by other
  // threads but not enough workers to get to the new task that will be
  // added
  bool create_thread = (thread_pool_shared_->num_workers_ <= thread_pool_shared_->graphtasks_queue_.size());
  thread_pool_shared_->graphtasks_queue_.push(graph_task);
  // Don't need to be holding the lock while actually creating the thread
  lck.unlock();
  if (create_thread) {
    std::thread t(&Engine::reentrant_thread_init, this);
    t.detach();
  }
  // This works even if new thread is created because wait() will test the
  // predicate before waiting
  thread_pool_shared_->work_.notify_one();
}

新线程执行函数 reentrant_thread_init 如下:

  • 与graph_task's 原线程共享 cpu_ready_queue。
  • 其从 graphtasks_queue_ 获取 GraphTask,赋值给 graph_task。
  • 然后用 thread_main(graph_task) 来执行。
代码语言:javascript复制
// Reentrant call will re-use the graph_task's owner thread ready_queue for
// queueing tasks (NOTE: this is not true in the async_mode of the engine).
// While we can create separate ready queue for each new reentrant
// thread, but sharing the same cpu_ready_queue with parent thread is a
// performance improvement and cuda thread still have to do the same thing.
void Engine::reentrant_thread_init() {
  at::init_num_threads();
  auto tp_shared = thread_pool_shared_;
  while(true) {
    std::unique_lock<std::mutex> lk(tp_shared->mutex_);
      thread_pool_shared_->num_workers_;
    tp_shared->work_.wait(lk, [&tp_shared]{ return !tp_shared->graphtasks_queue_.empty();});
    --thread_pool_shared_->num_workers_;
    auto task = tp_shared->graphtasks_queue_.front();
    tp_shared->graphtasks_queue_.pop();
    lk.unlock();
    std::shared_ptr<GraphTask> graph_task;
    if (!(graph_task = task.lock())) {
      continue;
    }
    set_device(graph_task->owner_);
    // set the local_ready_queue to the ready queue on the graph_task->owner_ device
    local_ready_queue = ready_queue_by_index(graph_task->cpu_ready_queue_, graph_task->owner_);
    total_depth = graph_task->reentrant_depth_;
    thread_main(graph_task); // 这里调用了线程函数
  }
}

4.4 主线程

除了上述线程之外,引擎还有一个主线程。这里使用 NO_DEVICE 来标识。如前所示,也会用 CPU_DEVICE 来临时做重入判别,但是依然是主线程。

代码语言:javascript复制
static constexpr int CPU_DEVICE = -1;
static constexpr int NO_DEVICE = -2;

4.5 流程解析

我们通过 execute_with_graph_task 来解析一下线程之间的生成关系。

代码语言:javascript复制
std::shared_ptr<at::ivalue::Future> Engine::execute_with_graph_task(
    const std::shared_ptr<GraphTask>& graph_task,
    std::shared_ptr<Node> graph_root,
    InputBuffer&& input_buffer) {
    
  initialize_device_threads_pool(); // 这里生成了设备线程

  // 这里指定了后续究竟是GPU还是CPU上运行,因为 input_buffer.device() 里面指定了运行的设备,所以依据这个设备,获取到了对应的 queue
  auto queue = ready_queue(graph_task->cpu_ready_queue_, input_buffer.device());

  if (worker_device == NO_DEVICE) { // 判断是否已经运行了反向传播

    // 主线程  
      
    set_device(CPU_DEVICE);
    graph_task->owner_ = worker_device; // set the graph_task owner to the current device
    queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));
 
    thread_main(graph_task); // thread_main 依然是被主线程执行,内部通过 pop 阻塞等待

    // 主线程  
    worker_device = NO_DEVICE;
  } else {
    // 主线程,可重入的反向传播  

    // If worker_device is any devices (i.e. CPU, CUDA): this is a re-entrant
    //    backward call from that device.     
    graph_task->owner_ = worker_device; // 指定是哪个设备,是 GPU 或者 CPU
    queue->push(NodeTask(graph_task, std::move(graph_root), std::move(input_buffer)));

    if (current_depth >= max_recursion_depth_) {
      // 达到最大重入深度,这里会启动一个新的线程  
      add_thread_pool_task(graph_task); // add_thread_pool_task 里面是GPU线程 或者 CPU线程取决于 worker_device
    } else {
        total_depth;
        current_depth;
      thread_main(graph_task); // thread_main 依然是被主线程执行,内部通过 pop 阻塞等待
      --current_depth;
      --total_depth;
    }
     
    // 主线程,可重入的反向传播   
  }

  return graph_task->future_result_;
}

具体线程关系如下:

  1. 主线程使用 push(NodeTask) 往 GraphTask.cpu_ready_queue_ 插入 NodeTask 0。
  2. 主线程使用 thread_main 从 GraphTask.cpu_ready_queue_ 取出 NodeTask 0,假设这个 NodeTask 0 的设备index 是 1。
  3. 主线程使用 thread_main 往 device 1 对应的 ReadyQueue 插入 NodeTask 1。
  4. 设备线程 1 阻塞在 device 1 对应的 ReadyQueue 1,这时候被唤醒,取出 NodeTask 1。
  5. 设备线程 1 处理 NodeTask 1,得到其后续的边,如果这个边的设备是 device 2, 那么生成一个 NodeTask 2,这个NodeTask 2 设备就是 2。然后把 NodeTask 2 插入 ReadyQueue 2。
  6. 设备线程 2 阻塞在 device 2 对应的 ReadyQueue 2,这时候被唤醒,取出 NodeTask 2,继续处理。
代码语言:javascript复制
                                ------------------------- 
                               | GraphTask               |
                               |                         |
                               |        cpu_ready_queue_ |
                               |                         |
                               |            |            |
                                ------------------------- 
                                            |
 ------------------                         |
| Main Thread      |                        v
|                  |      1         -------- --------- 
|                  |push(NodeTask0)|                  |                  ------------------- 
|            --------------------->   CPU ReadyQueue  |                 | Worker Thread 1   |
|                  |               |                  |      4          |                   |
| local_ready_queue|                --- --------------  pop(NodeTask1)  |                   |
|                  |   2 pop()         |                    ------------------>             |
|            --------------------------                    |            |                   |
|           |      |                --------------------   |            |                   |
|           |      |               | Device ReadyQueues |  |            | local_ready_queue |
|           |      |               |                    |  |            |                   |
|           |      |    3          |                    |  |            |                   |
|           |      |push(NodeTask1)|    -------------   |  |            |                   |
|            ------------------------> | ReadyQueue 1 -----             |                   |
|                  |               |    -------------   |      5        |                   |
|                  |               |                    |push(NodeTask2)|                   |
 ------------------                |                    |      ---------------              |
                                   |                    |     |         |                   |
                                   |                    |     |          ------------------- 
                                   |                    |     |
                                   |               -----------           ------------------- 
  ---------------------------      |              |     |               | Worker Thread 2   |
 | Engine                    |     |              |     |               |                   |
 |                           |     |              v     |               |                   |
 |                           |     |    ---------- --   |               |                   |
 |   device_ready_queues_  ----->  |   | ReadyQueue 2 ------------------------>             |
 |                           |     |    -------------   |pop(NodeTask2) |                   |
 |                           |     |                    |    6          | local_ready_queue |
  ---------------------------      |                    |               |                   |
                                   |                    |               |                   |
                                   |    -------------   |               |                   |
                                   |   | ReadyQueue 3|  |               |                   |
                                   |    -------------   |               |                   |
                                   |                    |               |                   |
                                    --------------------                 ------------------- 

手机如下:

0xFF 参考

详解Pytorch中的网络构造

PyTorch的优化器

PyTorch的分布式

PyTorch的Tensor(下)

PyTorch的Tensor(中)

PyTorch的Tensor(上)

PyTorch的动态图(下)

PyTorch的动态图(上)

0 人点赞