Istio源码解析4-Istio中pilot代理的启动

2022-11-28 16:19:34 浏览数 (1)

上一篇中我们介绍了EnvoyXdsServer的结构以及EnvoyXdsServer的启动流程、怎么与envoy客户端建立连接,当Istio CRD配置、K8s服务事件变化后,怎么监控到事件并把相关配置传到EnvoyXdsServer的channel中,如何进行防抖及推送,最后把事件传到每个客户端的connection中。这里我们介绍下envoy客户端的启动过程以及envoy如何与istiod建立连接。

作者:李运田, 中国移动云能力中心软件开发工程师,专注于云原生、Istio、微服务、Spring Cloud 等领域。

01

Istio-init初始化容器

服务在注入Sidecar的时候,会注入istio-init和istio-proxy两个容器。istio-init用于给Sidecar容器即Envoy代理做初始化,设置iptables端口转发,在Istio 1.1版本时还是使用isito-iptables.sh脚本修改iptables规则,现在是使用命令行来操作iptables,具体命令行可参考tools/istio-iptables/pkg/cmd/root.go代码。

在pilot-agent的代码pilot/cmd/pilot-agent/main.go里会调用tools中的istio-iptables进行iptables规则的设置

代码语言:javascript复制
rootCmd.AddCommand(iptables.GetCommand())
rootCmd.AddCommand(cleaniptables.GetCommand())

我们查看注入Sidecar的服务

代码语言:javascript复制
Containers:
  //istio-proxy镜像,作为服务的代理
  image: xxxx:8086/paas-pcb2/service-mesh/istio/proxyv2:debug
  imagePullPolicy: Always
  name: istio-proxy
  //初始化容器,进行pod中iptables规则的配置
  //与istio-proxy使用同样的镜像
initContainers:
//iptables命令的参数
- args:
  - istio-iptables
  - -p
  - "15001"
  - -z
  - "15006"
  - -u
  - "1337"
  - -m
  - REDIRECT
  - -i
  - '*'
  - -x
  - ""
  - -b
  - '*'
  - -d
  - 15090,15021,15020
  image: xxxx:8086/paas-pcb2/service-mesh/istio/proxyv2:debug
  imagePullPolicy: Always
  name: istio-init

初始化容器处理完iptables规则后即退出,该容器存在的意义就是让Envoy代理可以拦截所有的进出Pod的流量,即将入站流量重定向到Sidecar,再拦截应用容器的出站流量经过Sidecar处理后再出站。通过下面命令我们可以查看下istio-init做了哪些iptables规则的操作。

代码语言:javascript复制
iptables -t nat -S
-P PREROUTING ACCEPT
-P INPUT ACCEPT
-P OUTPUT ACCEPT
-P POSTROUTING ACCEPT
-N ISTIO_INBOUND
-N ISTIO_IN_REDIRECT
-N ISTIO_OUTPUT
-N ISTIO_REDIRECT
-A PREROUTING -p tcp -j ISTIO_INBOUND
-A OUTPUT -p tcp -j ISTIO_OUTPUT
-A ISTIO_INBOUND -p tcp -m tcp --dport 15008 -j RETURN
-A ISTIO_INBOUND -p tcp -m tcp --dport 15090 -j RETURN
-A ISTIO_INBOUND -p tcp -m tcp --dport 15021 -j RETURN
-A ISTIO_INBOUND -p tcp -m tcp --dport 15020 -j RETURN
-A ISTIO_INBOUND -p tcp -j ISTIO_IN_REDIRECT
-A ISTIO_IN_REDIRECT -p tcp -j REDIRECT --to-ports 15006
-A ISTIO_OUTPUT -s 127.0.0.6/32 -o lo -j RETURN
-A ISTIO_OUTPUT ! -d 127.0.0.1/32 -o lo -m owner --uid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT ! -d 127.0.0.1/32 -o lo -m owner --gid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -d 127.0.0.1/32 -j RETURN
-A ISTIO_OUTPUT -j ISTIO_REDIRECT
-A ISTIO_REDIRECT -p tcp -j REDIRECT --to-ports 15001

02

Istio-proxy代理启动

通过kubelet看下pilot-agent中的启动命令

代码语言:javascript复制
[root@vm-lyt istio]# kubectl exec -it helloworld-v1-5845f97d6b-qjsm9 -c istio-proxy -n foo -- sh
$ ps -aux
USER PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
istio-p  1  0.0  0.3 756148 26128 ?        Ssl  07:20   0:01 /usr/local/bin/pilot-agent proxy sidecar --domain foo.svc.cluster.local --proxyLogLevel=warning --proxyComponentLogLevel=misc:error --log_output_level=default:info --concurrency 2 --controlPlaneAuthPolicy NONE --dnsRefreshRate 300s --statusPort 15020 --trust-domain=cluster.local --controlPlaneBootstrap=false 
istio-p  18  0.1  0.6 174328 49344 ?        Sl   07:20   0:06 /usr/local/bin/envoy -c etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --drain-strategy immediate --parent-shutdown-time-s 60 --local-address-ip-version v4 --log-format [Envoy (Epoch 0)] [%Y-%m-%d %T.%e][%t][%l][%n] %v -l warning --component-log-level misc:error --concurrency 2

从上述可看出pilot-agent中包含两个进程pilot-agent和envoy,envoy是真正实现Sidecar机制的进程,实现服务治理策略、路由转发等功能。pilot-agent主要是负责启动istio-proxy,除了启动istio-proxy外,还具有如下功能:生成envoy的Bootstrap配置文件、进行envoy的健康检查、监视证书的变化,通知envoy进程热重启,实现证书的热加载、提供envoy守护功能,当envoy异常退出的时候重启envoy。下面我们看下istio-proxy的启动过程。

首先看下agent的配置信息

代码语言:javascript复制
type Agent struct {
  // 配置envoy中的信息,包括envoy运行文件、envoy启动参数等,具体可通过kubectl get cm istio -n istio-system -oyaml查看istio中的全局配置信息
  proxyConfig *mesh.ProxyConfig
//envoy运行时需要的一些配置参数
  envoyOpts envoy.ProxyConfig
//envoy agent实例
  envoyAgent  *envoy.Agent
//envoy管道,进行错误处理
  envoyWaitCh chan error  
// SDS服务器,用于工作负载的证书申请,envoy向pilot-agent申请证书和私钥,pilot-agent生成私钥和证书后向istiod发送证书签发请求,istiod根据请求中的服务信息为pilot-agent签发证书,将证书返回给pilot-agent,pilot-agent再将证书和私钥返回给envoy用于后面的envoy间的通信认证
  sdsServer *sds.Server
  // 用于SDS证书签证,可以通过文件的形式进行签证
  // 默认使用istiod对工作负载进行签证
  secretCache *cache.SecretManagerClient
  //xdsproxy用于istiod与envoy之间通信的渠道,istiod生成配置后通过conn连接传送给xdsproxy,xdsproxy接收istiod传来的数据后进行判断转发给envoy,envoy对配置信息进行处理
  xdsProxy *XdsProxy
  // 证书监听器,监听证书更新事件然后触发证书更新策略
  // 主要是获取证书然后重新进行签证生成envoy配置下发
  caFileWatcher filewatcher.FileWatcher
}

后面通过wait, err := agent.Run(ctx)进行pilot-agent的启动

代码语言:javascript复制
func (a *Agent) Run(ctx context.Context) (func(), error) {
  if socketExists {
    log.Info("SDS socket found. Istio SDS Server won't be started")
  } else {
    log.Info("SDS socket not found. Starting Istio SDS Server")
    //创建SDS服务器用于envoy证书的申请
    err = a.initSdsServer()
    if err != nil {
      return nil, fmt.Errorf("failed to start SDS server: %v", err)
    }
  }
  //进行一些proxy参数的赋值,包括istiod的ip、pod的ip等
  //核心组件,用于envoy服务发现以及与istiod之间的通信
  a.xdsProxy, err = initXdsProxy(a)
  //获取CA根证书
  rootCAForXDS, err := a.FindRootCAForXDS()
  if err != nil {
    return nil, fmt.Errorf("failed to find root XDS CA: %v", err)
  }
  //添加CA证书的监听机制,进行证书的动态更新
  go a.caFileWatcherHandler(ctx, rootCAForXDS)
  if !a.EnvoyDisabled() {
    //初始化envoy相关配置,包括envoy启动的配置路径、端口、并发数,基本都是和proxyConfig一致
    err = a.initializeEnvoyAgent(ctx)
    go func() {
      defer a.wg.Done()
      if a.cfg.EnableDynamicBootstrap {
        start := time.Now()
        var err error
        select {
        case err = <-a.envoyWaitCh:
        case <-ctx.Done():
          // Early cancellation before envoy started.
          return
        }
        if err != nil {
          log.Errorf("failed to write updated envoy bootstrap: %v", err)
          return
        }
        log.Infof("received server-side bootstrap in %v", time.Since(start))
      }
      //启动envoy
      a.envoyAgent.Run(ctx)
    }()
  } else if a.WaitForSigterm() {
    // wait for SIGTERM and perform graceful shutdown
    a.wg.Add(1)
    go func() {
      defer a.wg.Done()
      <-ctx.Done()
    }()
  }
  return a.wg.Wait, nil
}

在这里介绍下istiod中的安全机制如下图所示

0、在istiod初始化的时候会通过dicovery的maybeCreateCA方法创建istiod的CA根证书,该CA服务器负责为网格中的各个服务签发证书

1、envoy向pilot-agent发起SDS请求,要求获取自己的证书和私钥

2、pilot-agent生成私钥和CSR,向istiod发送证书签发请求

3、istiod根据请求中服务的sa进行身份认证,认证通过后,为其签发证书,将证书返回给pilot-agent

4、pilot-agent将证书和私钥通过SDS接口返回给envoy

5、istiod通过apiserver把自己的CA根证书通过configmap挂载到每个pod中

6、这样当两个envoy通信的时候,可以通过envoy中的私钥和挂载到pilot-agent中istiod的CA根证书进行双向认证

下面介绍下envoy与istio之间进行配置请求与信息相应

代码语言:javascript复制
func _AggregatedDiscoveryService_StreamAggregatedResources_Handler(srv interface{}, stream grpc.ServerStream) error {
  return srv.(AggregatedDiscoveryServiceServer).StreamAggregatedResources(&aggregatedDiscoveryServiceStreamAggregatedResourcesServer{stream})
}
// Every time envoy makes a fresh connection to the agent, we reestablish a new connection to the upstream xds
// This ensures that a new connection between istiod and agent doesn't end up consuming pending messages from envoy
// as the new connection may not go to the same istiod. Vice versa case also applies.
func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
  proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server")
  return p.handleStream(downstream)
}
//处理来自envoy的请求
func (p *XdsProxy) handleStream(downstream adsStream) error {
  con := &ProxyConnection{
    conID:           connectionNumber.Inc(),
    upstreamError:   make(chan error, 2), // can be produced by recv and send
    downstreamError: make(chan error, 2), // can be produced by recv and send
    requestsChan:    make(chan *discovery.DiscoveryRequest, 10),
    responsesChan:   make(chan *discovery.DiscoveryResponse, 10),
    stopChan:        make(chan struct{}),
    downstream:      downstream,
  }
  //赋值给xdsproxy
  p.RegisterStream(con)
  defer p.UnregisterStream(con)
  ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  defer cancel()
  //创建与istiod的连接
  upstreamConn, err := p.buildUpstreamConn(ctx)
  //创建与istiod通信的xds客户端
  xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
  ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
  for k, v := range p.xdsHeaders {
    ctx = metadata.AppendToOutgoingContext(ctx, k, v)
  }
  // We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection
  return p.HandleUpstream(ctx, con, xds)
}
//envoy与istiod之间的通信处理
func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
  upstream, err := xds.StreamAggregatedResources(ctx,
    grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
  
  //处理envoy到istiod的请求信息
  go p.handleUpstreamRequest(con)
  //处理istiod到envoy的返回值
  go p.handleUpstreamResponse(con)
}

最终在envoy中启动了两个协程处理envoy与istiod之间的请求与相应

代码语言:javascript复制
func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
  initialRequestsSent := atomic.NewBool(false)
  go func() {
    for {
      // 接受envoy的数据
      req, err := con.downstream.Recv()
      if err != nil {
        select {
        case con.downstreamError <- err:
        case <-con.stopChan:
        }
        return
      }
      // 发送给istiod
      con.sendRequest(req)
    }
  }()
}
func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
  for {
    select {
    //接受istiod传来的数据
    case resp := <-con.responsesChan:
      // TODO: separate upstream response handling from requests sending, which are both time costly
      proxyLog.Debugf("response for type url %s", resp.TypeUrl)
      metrics.XdsProxyResponses.Increment()
      //根据请求信息,进行相关转发处理
      if h, f := p.handlers[resp.TypeUrl]; f {
        if len(resp.Resources) == 0 {
          // Empty response, nothing to do
          // This assumes internal types are always singleton
          break
        }
        err := h(resp.Resources[0])
        var errorResp *google_rpc.Status
        if err != nil {
          errorResp = &google_rpc.Status{
            Code:    int32(codes.Internal),
            Message: err.Error(),
          }
        }
        // Send ACK/NACK
        con.sendRequest(&discovery.DiscoveryRequest{
          VersionInfo:   resp.VersionInfo,
          TypeUrl:       resp.TypeUrl,
          ResponseNonce: resp.Nonce,
          ErrorDetail:   errorResp,
        })
        continue
      }
      switch resp.TypeUrl {
      case v3.ExtensionConfigurationType:
        if features.WasmRemoteLoadConversion {
          // If Wasm remote load conversion feature is enabled, rewrite and send.
          go p.rewriteAndForward(con, resp)
        } else {
          //把数据发送给envoy
          forwardToEnvoy(con, resp)
        }
      }
    case <-con.stopChan:
      return
    }
  }
}

03

总结

文中介绍了istio-init初始化容器,其作用是对当前pod设置了iptables相关规则,以拦截所有的进出Pod的流量。也介绍了istio-proxy,了解了envoy启动过程以及envoy如何与istiod建立连接。

0 人点赞