Envoy请求流程源码解析(二)|请求解析

2022-03-03 10:46:24 浏览数 (1)

前言

Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。

Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。

本篇是Envoy请求流程源码解析的第二篇,主要分享Envoy的outbound方向上篇,包含启动监听和建立连接。注:本文中所讨论的issue和pr基于21年12月。

envoy当中基于libevent进行封装了各种文件,定时器事件等操作,以及dispatch对象的分发,和延迟析构,worker启动,worker listener绑定等部分不在这里作解读,后续有空可以单独再进行分析。跳过envoy当中的事件循环模型,这里以请求触发开始。

outbound方向

filter解析

启动监听

  1. 通过xDS或者静态配置,获得Envoy代理的监听器信息
  2. 如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback
代码语言:javascript复制
    void ListenerManagerImpl::addListenerToWorker(Worker& worker,
                                                  absl::optional<uint64_t> overridden_listener,
                                                  ListenerImpl& listener,
                                                  ListenerCompletionCallback completion_callback) {
      if (overridden_listener.has_value()) {
        ENVOY_LOG(debug, "replacing existing listener {}", overridden_listener.value());
        worker.addListener(overridden_listener, listener, [this, completion_callback](bool) -> void {
          server_.dispatcher().post([this, completion_callback]() -> void {
            stats_.listener_create_success_.inc();
            if (completion_callback) {
              completion_callback();
            }
          });
        });
        return;
      }
      worker.addListener(
          overridden_listener, listener, [this, &listener, completion_callback](bool success) -> void {
            // The add listener completion runs on the worker thread. Post back to the main thread to
            // avoid locking.
            server_.dispatcher().post([this, success, &listener, completion_callback]() -> void {
     
     
     
     
    void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options,
                                       bool bind_to_port) {
      setListenSocketOptions(options);
     
      if (bind_to_port) {
        bind(address_provider_->localAddress());
      }
    }
     
     
    ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
                                         Network::ListenerConfig& config)
        : ActiveTcpListener(
              parent,
              parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this,
                                                 config.bindToPort(), config.tcpBacklogSize()),
              config) {}
     
     
    class ActiveTcpListener : public Network::TcpListenerCallbacks,
                              public ActiveListenerImplBase,
                              public Network::BalancedConnectionHandler,
                              Logger::Loggable<Logger::Id::conn_handler> {
    public:
      ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);
      ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
                        Network::ListenerConfig& config);
      ~ActiveTcpListener() override;
      bool listenerConnectionLimitReached() const {
        // TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload
        // manager.
        return !config_->openConnections().canCreate();
      }
     
      void decNumConnections() {
        ASSERT(num_listener_connections_ > 0);
        --num_listener_connections_;
        config_->openConnections().dec();
      }
     
      // Network::TcpListenerCallbacks
      void onAccept(Network::ConnectionSocketPtr&& socket) override;
      void onReject(RejectCause) override;
     
    listener_.reset(
          // libevent的base                      当前对象方法                   套接字的文件描述符
          evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
      
      if (!listener_) {
        throw CreateListenerException(
            fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
      }
      
      if (!Network::Socket::applyOptions(socket.options(), socket,
                                         envoy::api::v3::core::SocketOption::STATE_LISTENING)) {
        throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",
                                                  socket.localAddress()->asString()));
      }
      
      evconnlistener_set_error_cb(listener_.get(), errorCallback);
    
关于reuseport
  1. https://github.com/envoyproxy/envoy/issues/4602#issuecomment-544704931
  2. https://github.com/envoyproxy/envoy/issues/8794
  3. https://lwn.net/Articles/542629/
  4. https://tech.flipkart.com/linux-tcp-so-reuseport-usage-and-implementation-6bfbf642885a

多个 server socket 监听相同的端口。每个 server socket 对应一个监听线程。内核 TCP 栈接收到客户端建立连接请求(SYN)时,按 TCP 4 元组(srcIP,srcPort,destIP,destPort) hash 算法,选择一个监听线程,唤醒之。新连接绑定到被唤醒的线程。所以相对于非SO_REUSEPORT, 连接更为平均地分布到线程中(hash 算法不是绝对平均)

envoy当中是支持在listener去设置开启这个特性,但是热重启场景时,对内核版本有一定要求(4.19-rc1)

https://www.envoyproxy.io/docs/envoy/v1.18.3/api-v3/config/listener/v3/listener.proto

验证观察

默认未开启,通过envoyfilter进行开启后,可见15001的端口被开启

代码语言:javascript复制
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: reuseport
  namespace: testhl
spec:
  workloadSelector:
    labels:
      app: asm-0
  configPatches:
    - applyTo: LISTENER
      match:
        context: SIDECAR_OUTBOUND
        listener:
          portNumber: 15001
          name: "virtualOutbound"
      patch:
        operation: MERGE
        value:
          reuse_port: true

需要重启 POD

而对于没有应用reuseport

大致的平均

关于绝对的链接平衡, 可以试试 Listener 的配置connection_balance_config:exact_balance,不过由于有锁,对高频新连接应该有一定的性能损耗。目前只适用于 TCP 监听器

代码语言:javascript复制
Network::BalancedConnectionHandlerOptRef new_listener;
 
  if (hand_off_restored_destination_connections_ &&
      socket_->addressProvider().localAddressRestored()) {
    // Find a listener associated with the original destination address.
    new_listener =
        listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
  }
 
 
if (!rebalanced) {
    Network::BalancedConnectionHandler& target_handler =
        config_->connectionBalancer().pickTargetHandler(*this);
    if (&target_handler != this) {
      target_handler.post(std::move(socket));
      return;
    }
  }
 
  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);
 
  // Create and run the filters
  config_->filterChainFactory().createListenerFilterChain(*active_socket);
  active_socket->continueFilterChain(true);
 
 
Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {
  // This is a linear operation, may need to add a map<address, listener> to improve performance.
  // However, linear performance might be adequate since the number of listeners is small.
  // We do not return stopped listeners.
  auto listener_it =
      std::find_if(listeners_.begin(), listeners_.end(),
                   [&address](std::pair<Network::Address::InstanceConstSharedPtr,
                                        ConnectionHandlerImpl::ActiveListenerDetails>& p) {
                     return p.second.tcpListener().has_value() &&
                            p.second.listener_->listener() != nullptr &&
                            p.first->type() == Network::Address::Type::Ip && *(p.first) == address;
                   });
 
  // If there is exact address match, return the corresponding listener.
  if (listener_it != listeners_.end()) {
    return Network::BalancedConnectionHandlerOptRef(
        listener_it->second.tcpListener().value().get());
  }
 
  // Otherwise, we need to look for the wild card match, i.e., 0.0.0.0:[address_port].
  // We do not return stopped listeners.
  // TODO(wattli): consolidate with previous search for more efficiency.
  if (Runtime::runtimeFeatureEnabled(
          "envoy.reloadable_features.listener_wildcard_match_ip_family")) {
    listener_it =
        std::find_if(listeners_.begin(), listeners_.end(),
                     [&address](const std::pair<Network::Address::InstanceConstSharedPtr,
                                                ConnectionHandlerImpl::ActiveListenerDetails>& p) {

建立连接

  1. DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback
  2. client向envoy发起连接,envoy的worker接收eventloop的callback, 触发 Envoy::Network::ListenerImpl::listenCallback(port: 15001)
  3. 15001的useOriginalDst": true,accept_filters_中会带有OriginalDstFilter
  4. OriginalDstFilter.OnAccept中用os_syscalls.getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)获取在iptables修改之前dst ip iptables与getsockopt
代码语言:javascript复制
  Network::Address::InstanceConstSharedPtr OriginalDstFilter::getOriginalDst(Network::Socket& sock) {
    return Network::Utility::getOriginalDst(sock);
  }
   
     
   
  sockaddr_storage orig_addr;
    memset(&orig_addr, 0, sizeof(orig_addr));
    socklen_t addr_len = sizeof(sockaddr_storage);
    int status;
   
    if (*ipVersion == Address::IpVersion::v4) {
      status = sock.getSocketOption(SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;
    } else {
      status = sock.getSocketOption(SOL_IPV6, IP6T_SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;
    }
   
    if (status != 0) {
      return nullptr;
    }
   
    return Address::addressFromSockAddr(orig_addr, 0, true /* default for v6 constructor */);
  1. 在newconnection当中,还会通过 getBalancedHandlerByAddress寻找到实际的虚拟listener
代码语言:javascript复制
    void ActiveTcpSocket::newConnection() {
      connected_ = true;
     
      // Check if the socket may need to be redirected to another listener.
      Network::BalancedConnectionHandlerOptRef new_listener;
     
      if (hand_off_restored_destination_connections_ &&
          socket_->addressProvider().localAddressRestored()) {
        // Find a listener associated with the original destination address.
        new_listener =
            listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
      }
    
  1. 通过ConnectionHandlerImpl::findActiveListenerByTag
代码语言:javascript复制
  Network::BalancedConnectionHandlerOptRef
  ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {
    // This is a linear operation, may need to add a map<address, listener> to improve performance.
    // However, linear performance might be adequate since the number of listeners is small.
    // We do not return stopped listeners.
    auto listener_it =
        std::find_if(listeners_.begin(), listeners_.end(),
                     [&address](std::pair<Network::Address::InstanceConstSharedPtr,
                                          ConnectionHandlerImpl::ActiveListenerDetails>& p) {
                       return p.second.tcpListener().has_value() &&
                              p.second.listener_->listener() != nullptr &&
                              p.first->type() == Network::Address::Type::Ip && *(p.first) == address;
                     });
   
    // If there is exact address match, return the corresponding listener.
    if (listener_it != listeners_.end()) {
      return Network::BalancedConnectionHandlerOptRef(
          listener_it->second.tcpListener().value().get());
    }
    

查到addr对应的Listener

  • 先查找Listener.IP==addr.ip && Listener.Port==addr.port的Listener
  • 再查找Listener.IP==0.0.0.0 && Listener.Port==addr.port的Listener (对于tcp服务,ip会有值,对于http服务,ip为4个0)
  1. dispatcher.createServerConnection传入accept到的fd 创建Server连接对象ConnectionImpl, 并把onFileEvent注册到eventloop,等待读写事件的到来,因为socket是由一个non-blocking listening socket创建而来,所以也是non-blocking
  2. 且注册的触发方式为epoll的边缘触发
代码语言:javascript复制
    auto server_conn_ptr = parent_.dispatcher().createServerConnection(
          std::move(socket), std::move(transport_socket), *stream_info);
     
    Network::ServerConnectionPtr
    DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
                                           Network::TransportSocketPtr&& transport_socket,
                                           StreamInfo::StreamInfo& stream_info) {
      ASSERT(isThreadSafe());
      return std::make_unique<Network::ServerConnectionImpl>(
          *this, std::move(socket), std::move(transport_socket), stream_info, true);
    }
     
    class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
    public:
      ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                           TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info,
                           bool connected);
     
      // ServerConnection impl
      void setTransportSocketConnectTimeout(std::chrono::milliseconds timeout) override;
      void raiseEvent(ConnectionEvent event) override;
     
    private:
      void onTransportSocketConnectTimeout();
     
      bool transport_connect_pending_{true};
      // Implements a timeout for the transport socket signaling connection. The timer is enabled by a
      // call to setTransportSocketConnectTimeout and is reset when the connection is established.
      Event::TimerPtr transport_socket_connect_timer_;
    };
     
     
    Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
     
      // We never ask for both early close and read at the same time. If we are reading, we want to
      // consume all available data.
      socket_->ioHandle().initializeFileEvent(
          dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
          Event::FileReadyType::Read | Event::FileReadyType::Write);
     
      transport_socket_->setTransportSocketCallbacks(*this);
     
     
    constexpr FileTriggerType determinePlatformPreferredEventType() {
    #if defined(WIN32) || defined(FORCE_LEVEL_EVENTS)
      return FileTriggerType::EmulatedEdge;
    #else
      return FileTriggerType::Edge;
    #endif
    }
     
    static constexpr FileTriggerType PlatformDefaultTriggerType = determinePlatformPreferredEventType();
  1. http的listener里filters为envoy.http_connection_managerbuildFilterChain里会把HTTP::ConnectionManagerImpl加入到upstream_filters_(list)中,这样在请求数据到达的时候,就可以使用http_connection_manager的on_read方法
代码语言:javascript复制
     void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
       ASSERT(connection_.state() == Connection::State::Open);
       ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
       filter->initializeReadFilterCallbacks(*new_filter);
       LinkedList::moveIntoListBack(std::move(new_filter), upstream_filters_);
     }
      
      
     CodecClient::CodecClient(Type type, Network::ClientConnectionPtr&& connection,
                              Upstream::HostDescriptionConstSharedPtr host,
                              Event::Dispatcher& dispatcher)
         : type_(type), host_(host), connection_(std::move(connection)),
           idle_timeout_(host_->cluster().idleTimeout()) {
       if (type_ != Type::HTTP3) {
         // Make sure upstream connections process data and then the FIN, rather than processing
         // TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
         // details)
         connection_->detectEarlyCloseWhenReadDisabled(false);
       }
       connection_->addConnectionCallbacks(*this);
       connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});
      
      
      connection_->noDelay(true);
     
  1. 当连接刚刚加入eventloop的时候, Write Event会被立即触发,但因为write_buffer_没有数据,所以不会写入任何数据
代码语言:javascript复制
   void CodecClient::onEvent(Network::ConnectionEvent event) {
     if (event == Network::ConnectionEvent::Connected) {
       ENVOY_CONN_LOG(debug, "connected", *connection_);
       connection_->streamInfo().setDownstreamSslConnection(connection_->ssl());
       connected_ = true;
     }
    
     if (event == Network::ConnectionEvent::RemoteClose) {
       remote_closed_ = true;
     }
    
     // HTTP/1 can signal end of response by disconnecting. We need to handle that case.
     if (type_ == Type::HTTP1 && event == Network::ConnectionEvent::RemoteClose &&
         !active_requests_.empty()) {
       Buffer::OwnedImpl empty;
       onData(empty);
     }
    
     if (event == Network::ConnectionEvent::RemoteClose ||
         event == Network::ConnectionEvent::LocalClose) {
       ENVOY_CONN_LOG(debug, "disconnect. resetting {} pending requests", *connection_,
                      active_requests_.size());
       disableIdleTimer();
       idle_timer_.reset();
       StreamResetReason reason = StreamResetReason::ConnectionFailure;
       if (connected_) {
         reason = StreamResetReason::ConnectionTermination;
         if (protocol_error_) {
           if (Runtime::runtimeFeatureEnabled(
                   "envoy.reloadable_features.return_502_for_upstream_protocol_errors")) {
             reason = StreamResetReason::ProtocolError;
             connection_->streamInfo().setResponseFlag(
                 StreamInfo::ResponseFlag::UpstreamProtocolError);
           }
         }
       }
       while (!active_requests_.empty()) {
         // Fake resetting all active streams so that reset() callbacks get invoked.
         active_requests_.front()->encoder_->getStream().resetStream(reason);
       }
     }
   }

相关阅读

Envoy请求流程源码解析(一)|流量劫持

0 人点赞