Istio源码解析3-Istio中配置与服务下发

2022-11-28 15:56:58 浏览数 (1)

上一篇中我们介绍了Istio中服务发现与配置处理,无论是Istio访问外部服务的配置(serviceentry、workloadentry) 、Istio流量规则(virtualservices、destinationrule等)还是Kubernetes原生的服务,在Istio中都是使用informer进行事件的监听,并使用handler进行相关事件的处理,在各个handler处理结束基本都是使用XDSServer.ConfigUpdate把处理好的配置与服务进行XDS的处理,本篇我们详细介绍下Istio是如何与数据面进行交互并进行配置的分发。

作者:李运田, 中国移动云能力中心软件开发工程师,专注于云原生、Istio、微服务、Spring Cloud 等领域。

01

DiscoveryServer启动

从上面我们看出基本所有的下发都是使用XDSServer,首先我们看下XDSServer的初始化结构

代码语言:javascript复制
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
  // Pilot 环境所需的 API 集合
  Env *model.Environment
  // xDS 数据的生成器接口
  ConfigGenerator core.ConfigGenerator
  // 统一接收其他组件发来的 PushRequest 的 channel
  pushChannel chan *model.PushRequest
//WorkloadEntry控制器,serviceEntryController在Server中定义
WorkloadEntryController *workloadentry.Controller
  // pushQueue 主要是在真正 xDS 推送前做防抖缓存
  pushQueue *PushQueue
  // 保存了所有生效的 gRPC 连接
  adsClients      map[string]*Connection
  //防抖参数
  debounceOptions debounceOptions
}

前面的文章中我们也大概介绍了下pilot-discovery的启动,在这里我们通过下图看下pilot-discovery的启动流程以及服务、配置下发过程。

pilot-discovery启动方法Start()中会启动两个协程handleUpdates和sendPushes,这俩协程的启动及作用在上图中也大概有标明,handleUpdates主要用来处理pushChannel中收到的推送请求以及防抖。sendPushes则负责具体的推送。

代码语言:javascript复制
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
  go s.WorkloadEntryController.Run(stopCh)
  go s.handleUpdates(stopCh)
  go s.periodicRefreshMetrics(stopCh)
  go s.sendPushes(stopCh)
}

02

客户端envoy连接及配置处理

在envoy/service/discovery/v3/ads.pb.go中定义了RPC接口

代码语言:javascript复制
var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{
  Streams: []grpc.StreamDesc{
    {
      StreamName:    "StreamAggregatedResources",
      Handler:       _AggregatedDiscoveryService_StreamAggregatedResources_Handler,
      ServerStreams: true,
      ClientStreams: true,
    },
    {
      StreamName:    "DeltaAggregatedResources",
      Handler:       _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,
      ServerStreams: true,
      ClientStreams: true,
    },
  },
}

StreamAggregatedResources接收DiscoveryRequest返回DiscoveryResponse流,包含全量的xDS数据

DeltaAggregatedResources接收DeltaDiscoveryRequest,返回DeltaDiscoveryResponse流,包含增量的 xDS 数据

这里我们以处理全量的xDS数据为例

代码语言:javascript复制
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
  return s.Stream(stream)
}
func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {
  //安全认证
  ids, err := s.authenticate(ctx)
  //建立connection连接
  con := newConnection(peerAddr, stream)
  //每个connection开一个协程接收放入connection channel中的req
  go s.receive(con, ids)
  for {
  //循环处理channel中的req
    select {
    case req, ok := <-con.reqChan: // 这里是envoy->istiod
      if ok {
        if err := s.processRequest(req, con); err != nil {
          return err
        }
      } else {
        // Remote side closed connection or error processing the request.
        return <-con.errorChan
      }
    case pushEv := <-con.pushChannel: // 这里是istiod->envoy
      err := s.pushConnection(con, pushEv)
      pushEv.done()
      if err != nil {
        return err
      }
    }
  }
}

EnvoyXdsServer启动完后,开始接收来自configController、serviceController的配置变化以及K8s原生服务事件,当服务数据的变化和配置数据的变化时,都会创建PushRequest发送至EnvoyXdsServer的pushChannel中。

放入pushChannel中的PushRequest后续会通过handleUpdates进一步的进行处理,handleUpdates最重要的功能就是防抖,避免因过快的推送带来的问题和压力。

防抖处理的主要函数

代码语言:javascript复制
// The debounce helper function is implemented to enable mocking

func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {

push := func(req *model.PushRequest, debouncedEvents int) {

pushFn(req)

updateSent.Add(int64(debouncedEvents))

freeCh <- struct{}{}

}

pushWorker := func() {

eventDelay := time.Since(startDebounce)

quietTime := time.Since(lastConfigUpdateTime)

// it has been too long or quiet enough

//当事件的延迟时间大于等于最大延迟时间或静默时间大于等于最小静默时间,才会执行 push() 方法
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
      if req != nil {
        pushCounter  
        free = false
        go push(req, debouncedEvents)
        req = nil
        debouncedEvents = 0
      }
    } else {
      timeChan = time.After(opts.debounceAfter - quietTime)
    }
  }
  
  for {
    select {
    case <-freeCh:
      free = true
      pushWorker()
    case r := <-ch:
      // If reason is not set, record it as an unknown reason
      if len(r.Reason) == 0 {
        r.Reason = []model.TriggerReason{model.UnknownTrigger}
      }
      if !opts.enableEDSDebounce && !r.Full {
        // trigger push now, just for EDS
        go func(req *model.PushRequest) {
          pushFn(req)
          updateSent.Inc()
        }(r)
        continue
      }
      lastConfigUpdateTime = time.Now()
      if debouncedEvents == 0 {
        timeChan = time.After(opts.debounceAfter)
        startDebounce = lastConfigUpdateTime
      }
      debouncedEvents  
      //合并连续发生的多个PushRequest
      req = req.Merge(r)
    case <-timeChan:
      if free {
        pushWorker()
      }
    case <-stopCh:
      return
    }
  }
}
经过防抖处理后会调用s.Push进行req的推送
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
  // Push config changes, iterating over connected envoys
  req.Start = time.Now()
  //对连接的所有envoy客户端connection进行req推送
  for _, p := range s.AllClients() {
    s.pushQueue.Enqueue(p, req)
  }
}
s.pushQueue中保存了所有客户端envoy的connection信息以及PushRequest信息,具体的PushQueue结构如下
type PushQueue struct {
  cond *sync.Cond
  // pending stores all connections in the queue. If the same connection is enqueued again,
  // the PushRequest will be merged.
  //pending保存了所有代理gRPC连接的PushRequest ,如果相同连接的PushRequest再次入队,将会被合并
  pending map[*Connection]*model.PushRequest
  // 所有的envoy连接connection
  queue []*Connection
  // processing stores all connections that have been Dequeue(), but not MarkDone().
  // The value stored will be initially be nil, but may be populated if the connection is Enqueue().
  // If model.PushRequest is not nil, it will be Enqueued again once MarkDone has been called.
  processing map[*Connection]*model.PushRequest
  shuttingDown bool
}

到这里就把集群中监听到的Istio CRD配置事件以及K8s的服务事件都入队到PushQueue结构中,后面会调用doSendPushes从PushQueue结构中获取每个connection及req信息进行配置和服务的推送。

03

配置与服务的推送

通过doSendPushes从pushQueue中通过Dequeue()方法获取每个客户端connection和对应的PushRequest,再根据PushRequest生成pushEv传入客户端connection的pushChannel中。

代码语言:javascript复制
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
  for {
    select {
    case <-stopCh:
      return
    default:
      //从pushQueue中通过Dequeue()方法获取每个客户端connection和对应的PushRequest
      client, push, shuttingdown := queue.Dequeue()
      if shuttingdown {
        return
      }
      recordPushTriggers(push.Reason...)
      // Signals that a push is done by reading from the semaphore, allowing another send on it.
      doneFunc := func() {
        queue.MarkDone(client)
        <-semaphore
      }
      proxiesQueueTime.Record(time.Since(push.Start).Seconds())
      var closed <-chan struct{}
      if client.stream != nil {
        closed = client.stream.Context().Done()
      } else {
        closed = client.deltaStream.Context().Done()
      }
      go func() {
      //生成pushEv
        pushEv := &Event{
          pushRequest: push,
          done:        doneFunc,
        }
        select {
        //把pushEv传入每个客户端的channel中
        case client.pushChannel <- pushEv:
          return
        case <-closed: // grpc stream was closed
          doneFunc()
          log.Infof("Client closed connection %v", client.conID)
        }
      }()
    }
  }
}

在前面介绍pilot/pkg/xds/ads.go中StreamAggregatedResources接受每个客户端连接请求时,有如下代码

代码语言:javascript复制
case pushEv := <-con.pushChannel: // 这里是istiod->envoy
      err := s.pushConnection(con, pushEv)
      pushEv.done()
      if err != nil {
        return err
      }

这样把pushEv传入每个客户端的channel中后,会调用此处的pushEv := <-con.pushChannel获取到pushEv 并使用s.pushConnection(con, pushEv)进行配置的推送

代码语言:javascript复制
// Compute and send the new configuration for a connection.
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
  pushRequest := pushEv.pushRequest
  //直接进行全量推送
  if pushRequest.Full {
    // Update Proxy with current information.
    s.updateProxy(con.proxy, pushRequest)
  }
  if !s.ProxyNeedsPush(con.proxy, pushRequest) {
    log.Debugf("Skipping push to %v, no updates required", con.conID)
    if pushRequest.Full {
      // Only report for full versions, incremental pushes do not have a new version.
      reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)
    }
    return nil
  }
  // Send pushes to all generators
  // Each Generator is responsible for determining if the push event requires a push
  wrl, ignoreEvents := con.pushDetails()
  for _, w := range wrl {
    if err := s.pushXds(con, w, pushRequest); err != nil {
      return err
    }
  }
  if pushRequest.Full {
    // Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
    reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)
  }
  proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
  return nil
}

04

总结

文中介绍了EnvoyXdsServer的结构以及EnvoyXdsServer的启动流程、怎么与envoy客户端建立连接,当Istio CRD配置、K8s服务事件变化后,怎么监控到事件并把相关配置传到EnvoyXdsServer的channel中,如何进行防抖及推送,最后把事件传到每个客户端的connection中。


CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。

CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请长按以下二维码进行关注。

0 人点赞