Containerd容器管理机制

2023-08-19 09:33:04 浏览数 (1)

简介

containerd 是一个来自 Docker 的高级容器运行时,并实现了 CRI 规范。它是从 Docker 项目中分离出来,之后 containerd 被捐赠给云原生计算基金会(CNCF)为容器社区提供创建新容器解决方案的基础。

当前k8s在创建容器的时候,是通过 containerd 来创建一个容器,containerd 收到请求后,会创建一个叫做 containerd-shim 的进程,让这个进程去操作容器,

为什么需要这样一个进程呢?

容器进程是需要一个父进程来做状态收集、维持 stdin 等 fd 打开等工作的,假如这个父进程就是 containerd,那如果 containerd 挂掉的话,整个宿主机上所有的容器都得退出了,而引入 containerd-shim 这个垫片就可以来规避这个问题了。

然后创建容器需要做一些 namespaces 和 cgroups 的配置,以及挂载 root 文件系统等操作,这些操作其实已经有了标准的规范,那就是 OCI(开放容器标准), runc 就是它的一个参考实现。

所以真正启动容器是通过 containerd-shim 去调用 runc 来启动容器的, runc 启动完容器后本身会直接退出, containerd-shim 则会成为容器进程的父进程, 负责收集容器进程的状态, 上报给 containerd, 并在容器中 pid 为 1 的进程退出后接管容器中的子进程进行清理, 确保不会出现僵尸进程。

源码地址:https://github.com/containerd/containerd

初始化容器处理

创建容器的代码是放在这个方法里面的:

代码语言:javascript复制
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
   ns, err := namespaces.NamespaceRequired(ctx)
   if err != nil {
      return nil, fmt.Errorf("create namespace: %w", err)
   }

   opts := &options.Options{}
   if r.Options.GetValue() != nil {
      v, err := typeurl.UnmarshalAny(r.Options)
      if err != nil {
         return nil, err
      }
      if v != nil {
         opts = v.(*options.Options)
      }
   }

   var pmounts []process.Mount
   for _, m := range r.Rootfs {
      pmounts = append(pmounts, process.Mount{
         Type: m.Type,
         Source: m.Source,
         Target: m.Target,
         Options: m.Options,
      })
   }

   rootfs := ""
   if len(pmounts) > 0 {
      rootfs = filepath.Join(r.Bundle, "rootfs")
      if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
         return nil, err
      }
   }

   config := &process.CreateConfig{
      ID: r.ID,
      Bundle: r.Bundle,
      Runtime: opts.BinaryName,
      Rootfs: pmounts,
      Terminal: r.Terminal,
      Stdin: r.Stdin,
      Stdout: r.Stdout,
      Stderr: r.Stderr,
      Checkpoint: r.Checkpoint,
      ParentCheckpoint: r.ParentCheckpoint,
      Options: r.Options,
   }

   if err := WriteOptions(r.Bundle, opts); err != nil {
      return nil, err
   }
   // For historical reason, we write opts.BinaryName as well as the entire opts
   if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
      return nil, err
   }

   var mounts []mount.Mount
   for _, pm := range pmounts {
      mounts = append(mounts, mount.Mount{
         Type: pm.Type,
         Source: pm.Source,
         Target: pm.Target,
         Options: pm.Options,
      })
   }
   defer func() {
      if retErr != nil {
         if err := mount.UnmountMounts(mounts, rootfs, 0); err != nil {
            logrus.WithError(err).Warn("failed to cleanup rootfs mount")
         }
      }
   }()
   if err := mount.All(mounts, rootfs); err != nil {
      return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
   }

   p, err := newInit(
      ctx,
      r.Bundle,
      filepath.Join(r.Bundle, "work"),
      ns,
      platform,
      config,
      opts,
      rootfs,
   )
   if err != nil {
      return nil, errdefs.ToGRPC(err)
   }
   if err := p.Create(ctx, config); err != nil {
      return nil, errdefs.ToGRPC(err)
   }
   container := &Container{
      ID: r.ID,
      Bundle: r.Bundle,
      process: p,
      processes: make(map[string]process.Process),
      reservedProcess: make(map[string]struct{}),
   }
   pid := p.Pid()
   if pid > 0 {
      var cg interface{}
      if cgroups.Mode() == cgroups.Unified {
         g, err := cgroupsv2.PidGroupPath(pid)
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
            return container, nil
         }
         cg, err = cgroupsv2.Load(g)
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
         }
      } else {
         cg, err = cgroup1.Load(cgroup1.PidPath(pid))
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup for %d", pid)
         }
      }
      container.cgroup = cg
   }
   return container, nil
}

上面代码逻辑大致如下:

  • 创建文件系统根目录
  • 挂载磁盘映射
  • 根据指定的runc工具路径初始化runc对象(默认是/run/containerd/runc
  • 初始化容器管理处理器(关联 runc对象,后面对容器的所有操作都是在这个处理器中完成)
  • 通过runc命令在宿主机上创建容器
  • 初始化容器对象(持有容器处理器对象)

创建容器

具体的创建容器方法如下:

代码语言:javascript复制
// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
   var (
      err error
      socket *runc.Socket
      pio *processIO
      pidFile = newPidFile(p.Bundle)
   )

   if r.Terminal {
      if socket, err = runc.NewTempConsoleSocket(); err != nil {
         return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
      }
      defer socket.Close()
   } else {
      if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
         return fmt.Errorf("failed to create init process I/O: %w", err)
      }
      p.io = pio
   }
   if r.Checkpoint != "" {
      return p.createCheckpointedState(r, pidFile)
   }
   opts := &runc.CreateOpts{
      PidFile: pidFile.Path(),
      NoPivot: p.NoPivotRoot,
      NoNewKeyring: p.NoNewKeyring,
   }
   if p.io != nil {
      opts.IO = p.io.IO()
   }
   if socket != nil {
      opts.ConsoleSocket = socket
   }
   if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
      return p.runtimeError(err, "OCI runtime create failed")
   }
   if r.Stdin != "" {
      if err := p.openStdin(r.Stdin); err != nil {
         return err
      }
   }
   ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
   defer cancel()
   if socket != nil {
      console, err := socket.ReceiveMaster()
      if err != nil {
         return fmt.Errorf("failed to retrieve console master: %w", err)
      }
      console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
      if err != nil {
         return fmt.Errorf("failed to start console copy: %w", err)
      }
      p.console = console
   } else {
      if err := pio.Copy(ctx, &p.wg); err != nil {
         return fmt.Errorf("failed to start io pipe copy: %w", err)
      }
   }
   pid, err := pidFile.Read()
   if err != nil {
      return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err)
   }
   p.pid = pid
   return nil
}

上面的代码逻辑大致如下:

  • 创建IO或者socket通信
  • 通过runc工具命令创建容器
  • 获取容器的pid并绑定到容器处理器中

启动容器

通过上面初始化容器我们知道,容器的所有操作管理都是在容器对象里面处理的,比如启动容器:

代码语言:javascript复制
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) {
   p, err := c.Process(r.ExecID)
   if err != nil {
      return nil, err
   }
   if err := p.Start(ctx); err != nil {
      return nil, err
   }
   if c.Cgroup() == nil && p.Pid() > 0 {
      var cg interface{}
      if cgroups.Mode() == cgroups.Unified {
         g, err := cgroupsv2.PidGroupPath(p.Pid())
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
         }
         cg, err = cgroupsv2.Load(g)
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
         }
      } else {
         cg, err = cgroup1.Load(cgroup1.PidPath(p.Pid()))
         if err != nil {
            logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
         }
      }
      c.cgroup = cg
   }
   return p, nil
}

上面的代码逻辑大致如下:

  • 获取容器处理器(处理器是一个接口,容器不同的状态下绑定的实现对象不一样)
  • 通过处理器执行runc命令启动容器

代码语言:javascript复制
func (s *createdState) Start(ctx context.Context) error
{
   if err := s.p.start(ctx); err != nil {
      return err
   }
   return s.transition("running")
}

func (p *Init) start(ctx context.Context) error {
  err := p.runtime.Start(ctx, p.id)
  return p.runtimeError(err, "OCI runtime start failed")
}

// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
  return r.runOrError(r.command(context, "start", id))
}

https://www.qikqiak.com/post/containerd-usage/

https://colstuwjx.github.io/2021/08/源码解读从代码实现层面思考-kubernetes-为什么会弃用对-docker-的支持/

0 人点赞