Envoy请求流程源码解析(三)|请求解析

2022-04-14 16:08:09 浏览数 (1)

前言

Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。

Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。

本篇将是Envoy请求流程源码解析的第三篇,主要分享Envoy的outbound方向下篇,包含:接收请求、发送请求、接收响应、返回响应。注:本文中所讨论的issue和pr基于21年12月。

outbound方向

接收请求

  1. client开始向socket写入请求数据
  2. eventloop在触发read event后,transport_socket_.doRead中会循环读取加入read_buffer_,直到返回EAGAIN
代码语言:javascript复制
   void ConnectionImpl::onReadReady() {
     ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this, dispatch_buffered_data_);
     const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
     dispatch_buffered_data_ = false;
    
     ASSERT(!connecting_);
    
     // We get here while read disabled in two ways.
     // 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
     //    due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
     //    enabled.
     // 2) The consumer of connection data called readDisable(true), and instead of reading from the
     //    socket we simply need to dispatch already read data.
     if (read_disable_count_ != 0) {
       // Do not clear transport_wants_read_ when returning early; the early return skips the transport
       // socket doRead call.
       if (latched_dispatch_buffered_data && filterChainWantsData()) {
         onRead(read_buffer_->length());
       }
       return;
     }
    
     // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
     // the transport socket read resumption happens as requested; onReadReady() returns early without
     // reading from the transport if the read buffer is above high watermark at the start of the
     // method.
     transport_wants_read_ = false;
     IoResult result = transport_socket_->doRead(*read_buffer_);
     uint64_t new_buffer_size = read_buffer_->length();
     updateReadBufferStats(result.bytes_processed_, new_buffer_size);
    
     // If this connection doesn't have half-close semantics, translate end_stream into
     // a connection close.
     if ((!enable_half_close_ && result.end_stream_read_)) {
       result.end_stream_read_ = false;
       result.action_ = PostIoAction::Close;
     }
    
     read_end_stream_ |= result.end_stream_read_;
     if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
         (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
       // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
       // buffered data. For instance, skip onRead if the connection was closed without producing
       // more data.
       onRead(new_buffer_size);
     }
    
     // The read callback may have already closed the connection.
     if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
       ENVOY_CONN_LOG(debug, "remote close", *this);
       closeSocket(ConnectionEvent::RemoteClose);
     }
   }
  1. 把buffer传入Envoy::Http::ConnectionManagerImpl::onData进行HTTP请求的处理
代码语言:javascript复制
    Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
      if (!codec_) {
        // Http3 codec should have been instantiated by now.
        createCodec(data);
      }
     
      bool redispatch;
      do {
        redispatch = false;
     
        const Status status = codec_->dispatch(data);
     
        if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
          handleCodecError(status.message());
          return Network::FilterStatus::StopIteration;
        } else if (isCodecProtocolError(status)) {
          stats_.named_.downstream_cx_protocol_error_.inc();
          handleCodecError(status.message());
          return Network::FilterStatus::StopIteration;
        }
        ASSERT(status.ok());
  1. 如果codec_type是AUTO(HTTP1,2,3目前还不支持,在计划中)的情况下,会判断请求是否以PRI * HTTP/2为开始来判断是否http2
代码语言:javascript复制
    Http::ServerConnectionPtr
    HttpConnectionManagerConfig::createCodec(Network::Connection& connection,
                                             const Buffer::Instance& data,
                                             Http::ServerConnectionCallbacks& callbacks) {
      switch (codec_type_) {
      case CodecType::HTTP1: {
        return std::make_unique<Http::Http1::ServerConnectionImpl>(
            connection, Http::Http1::CodecStats::atomicGet(http1_codec_stats_, context_.scope()),
            callbacks, http1_settings_, maxRequestHeadersKb(), maxRequestHeadersCount(),
            headersWithUnderscoresAction());
      }
      case CodecType::HTTP2: {
        return std::make_unique<Http::Http2::ServerConnectionImpl>(
            connection, callbacks,
            Http::Http2::CodecStats::atomicGet(http2_codec_stats_, context_.scope()),
            context_.api().randomGenerator(), http2_options_, maxRequestHeadersKb(),
            maxRequestHeadersCount(), headersWithUnderscoresAction());
      }
      case CodecType::HTTP3:
    #ifdef ENVOY_ENABLE_QUIC
        return std::make_unique<Quic::QuicHttpServerConnectionImpl>(
            dynamic_cast<Quic::EnvoyQuicServerSession&>(connection), callbacks,
            Http::Http3::CodecStats::atomicGet(http3_codec_stats_, context_.scope()), http3_options_,
            maxRequestHeadersKb(), headersWithUnderscoresAction());
    #else
        // Should be blocked by configuration checking at an earlier point.
        NOT_REACHED_GCOVR_EXCL_LINE;
    #endif
      case CodecType::AUTO:
        return Http::ConnectionManagerUtility::autoCreateCodec(
            connection, data, callbacks, context_.scope(), context_.api().randomGenerator(),
            http1_codec_stats_, http2_codec_stats_, http1_settings_, http2_options_,
            maxRequestHeadersKb(), maxRequestHeadersCount(), headersWithUnderscoresAction());
      }
      NOT_REACHED_GCOVR_EXCL_LINE;
    }
     
     
    std::string ConnectionManagerUtility::determineNextProtocol(Network::Connection& connection,
                                                                const Buffer::Instance& data) {
      if (!connection.nextProtocol().empty()) {
        return connection.nextProtocol();
      }
     
      // See if the data we have so far shows the HTTP/2 prefix. We ignore the case where someone sends
      // us the first few bytes of the HTTP/2 prefix since in all public cases we use SSL/ALPN. For
      // internal cases this should practically never happen.
      if (data.startsWith(Http2::CLIENT_MAGIC_PREFIX)) {
        return Utility::AlpnNames::get().Http2;
      }
     
      return "";
    }
     
     
    const std::string CLIENT_MAGIC_PREFIX = "PRI * HTTP/2";
  1. 利用http_parser进行http解析的callback,ConnectionImpl::settings_静态初始化了parse各个阶段的callbacks
代码语言:javascript复制
    http_parser_settings ConnectionImpl::settings_{
        [](http_parser* parser) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onMessageBeginBase();
          return 0;
        },
        [](http_parser* parser, const char* at, size_t length) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onUrl(at, length);
          return 0;
        },
        nullptr, // on_status
        [](http_parser* parser, const char* at, size_t length) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onHeaderField(at, length);
          return 0;
        },
        [](http_parser* parser, const char* at, size_t length) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onHeaderValue(at, length);
          return 0;
        },
        [](http_parser* parser) -> int {
          return static_cast<ConnectionImpl*>(parser->data)->onHeadersCompleteBase();
        },
        [](http_parser* parser, const char* at, size_t length) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onBody(at, length);
          return 0;
        },
        [](http_parser* parser) -> int {
          static_cast<ConnectionImpl*>(parser->data)->onMessageCompleteBase();
          return 0;
        },
        nullptr, // on_chunk_header
        nullptr  // on_chunk_complete
    };

envoy社区有讨论会将协议解析器从http_parser换成llhttp

  • https://github.com/envoyproxy/envoy/issues/5155
  • https://github.com/envoyproxy/envoy/pull/15263/files 使用解析器接口,重构http parser
  • https://github.com/envoyproxy/envoy/pull/15814添加llhttp解析器的实现,暂时还没合并
代码语言:javascript复制
    if (pos != absl::string_view::npos) {
          // Include r or n
          new_data = new_data.substr(0, pos   1);
          ssize_t rc = http_parser_execute(&parser_, &settings_, new_data.data(), new_data.length());
          ENVOY_LOG(trace, "http inspector: http_parser parsed {} chars, error code: {}", rc,
                    HTTP_PARSER_ERRNO(&parser_));
     
          // Errors in parsing HTTP.
          if (HTTP_PARSER_ERRNO(&parser_) != HPE_OK && HTTP_PARSER_ERRNO(&parser_) != HPE_PAUSED) {
            return ParseState::Error;
          }
     
          if (parser_.http_major == 1 && parser_.http_minor == 1) {
            protocol_ = Http::Headers::get().ProtocolStrings.Http11String;
          } else {
            // Set other HTTP protocols to HTTP/1.0
            protocol_ = Http::Headers::get().ProtocolStrings.Http10String;
          }
          return ParseState::Done;
        } else {
          ssize_t rc = http_parser_execute(&parser_, &settings_, new_data.data(), new_data.length());
          ENVOY_LOG(trace, "http inspector: http_parser parsed {} chars, error code: {}", rc,
                    HTTP_PARSER_ERRNO(&parser_));
     
          // Errors in parsing HTTP.
          if (HTTP_PARSER_ERRNO(&parser_) != HPE_OK && HTTP_PARSER_ERRNO(&parser_) != HPE_PAUSED) {
            return ParseState::Error;
          } else {
            return ParseState::Continue;
          }
     
     
     
     
        return {http_parser_execute(&parser_, &settings_, slice, len), HTTP_PARSER_ERRNO(&parser_)};

onMessageBeginBase

代码语言:javascript复制
     current_header_map_ = std::make_unique<HeaderMapImpl>();
      header_parsing_state_ = HeaderParsingState::Field;
     
     
     
    Status ConnectionImpl::onMessageBegin() {
      ENVOY_CONN_LOG(trace, "message begin", connection_);
      // Make sure that if HTTP/1.0 and HTTP/1.1 requests share a connection Envoy correctly sets
      // protocol for each request. Envoy defaults to 1.1 but sets the protocol to 1.0 where applicable
      // in onHeadersCompleteBase
      protocol_ = Protocol::Http11;
      processing_trailers_ = false;
      header_parsing_state_ = HeaderParsingState::Field;
      allocHeaders(statefulFormatterFromSettings(codec_settings_));
      return onMessageBeginBase();
    }
     
    Status ServerConnectionImpl::onMessageBeginBase() {
      if (!resetStreamCalled()) {
        ASSERT(!active_request_.has_value());
        active_request_.emplace(*this);
        auto& active_request = active_request_.value();
        if (resetStreamCalled()) {
          return codecClientError("cannot create new streams after calling reset");
        }
        active_request.request_decoder_ = &callbacks_.newStream(active_request.response_encoder_);
     
        // Check for pipelined request flood as we prepare to accept a new request.
        // Parse errors that happen prior to onMessageBegin result in stream termination, it is not
        // possible to overflow output buffers with early parse errors.
        RETURN_IF_ERROR(doFloodProtectionChecks());
      }
      return okStatus();
    }
    
  • 创建ActiveStream, 保存downstream的信息,和对应的route信息
  • 对于https,会把TLS握手的时候保存的SNI写入ActiveStream.requested_server_name_
代码语言:javascript复制
       void setRequestedServerName(absl::string_view requested_server_name) override {
           requested_server_name_ = std::string(requested_server_name);
         }
        
       void Filter::onServername(absl::string_view name) {
         if (!name.empty()) {
           config_->stats().sni_found_.inc();
           cb_->socket().setRequestedServerName(name);
           ENVOY_LOG(debug, "tls:onServerName(), requestedServerName: {}", name);
         } else {
           config_->stats().sni_not_found_.inc();
         }
         clienthello_success_ = true;
       }

onHeaderField,onHeaderValue 迭代添加header到current_header_map_

解析完最后一个请求头后会执行 onHeadersComplete 把request中的一些字段(method, path, host )加入headers中

代码语言:javascript复制
const Http::HeaderValues& header_values = Http::Headers::get();
active_request.response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
                                                            header_values.MethodValues.Head);
active_request.response_encoder_.setIsResponseToConnectRequest(
    parser_->methodName() == header_values.MethodValues.Connect);
 
RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request.request_url_.empty());
 
headers->setMethod(parser_->methodName());
headers->setScheme("http"); 

回调 onHeadersComplete, 依次回调onMessageComplete,onMessageCompleteBase,ServerConnectionImpl::onMessageComplete

  • 这个请求解码是Envoy上下文的,它会执行Envoy的核心代理逻辑 —— 遍历HTTP过滤器链、进行路由选择
  • 此过滤器当中判断请求过载
  • 通过route上的cluster name从ThreadLocalClusterManager中查找cluster, 缓存在cached_cluster_info_
  • 根据配置构造在route上的filterChain (具体的filter实现是通过registerFactory方法注册进去,在createFilterChain的时候根据名称构造,比如istio-proxy的stats)
  • 如果对应http connection manager上有trace配置
代码语言:javascript复制
        if (connection_manager_.config_.tracingConfig()) {
          traceRequest();
        }
  • request header中有trace,就创建子span, sampled跟随parent span
  • 如果header中没有trace,就创建root span, 并设置sampled
代码语言:javascript复制
        void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
          Router::RouteConstSharedPtr route;
          if (request_headers_ != nullptr) {
            if (connection_manager_.config_.isRoutable() &&
                connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
              // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
              snapScopedRouteConfig();
            }
            if (snapped_route_config_ != nullptr) {
              route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
                                                   stream_id_);
            }
          }
         
          setRoute(route);
        }
         
        void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                                bool end_stream) {
        ScopeTrackerScopeState scope(this,
                                       connection_manager_.read_callbacks_->connection().dispatcher());
          request_headers_ = std::move(headers);
          filter_manager_.requestHeadersInitialized();
          if (request_header_timer_ != nullptr) {
            request_header_timer_->disableTimer();
            request_header_timer_.reset();
          }
         
          Upstream::HostDescriptionConstSharedPtr upstream_host =
              connection_manager_.read_callbacks_->upstreamHost();
         
          if (upstream_host != nullptr) {
            Upstream::ClusterRequestResponseSizeStatsOptRef req_resp_stats =
                upstream_host->cluster().requestResponseSizeStats();
            if (req_resp_stats.has_value()) {
              req_resp_stats->get().upstream_rq_headers_size_.recordValue(request_headers_->byteSize());
            }
          }
         
          // Both saw_connection_close_ and is_head_request_ affect local replies: set
          // them as early as possible.
          const Protocol protocol = connection_manager_.codec_->protocol();
          state_.saw_connection_close_ = HeaderUtility::shouldCloseConnection(protocol, *request_headers_);
         
          // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
          if (connection_manager_.config_.isRoutable()) {
            if (connection_manager_.config_.routeConfigProvider() != nullptr) {
              snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->config();
            } else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
              snapped_scoped_routes_config_ =
                  connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
              snapScopedRouteConfig();
            }
          } else {
            snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->config();
          }
         
          ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):n{}", *this, end_stream,
                           *request_headers_);
         
          // We end the decode here only if the request is header only. If we convert the request to a
          // header only, the stream will be marked as done once a subsequent decodeData/decodeTrailers is
          // called with end_stream=true.
          filter_manager_.maybeEndDecode(end_stream);
         
          // Drop new requests when overloaded as soon as we have decoded the headers.
          if (connection_manager_.random_generator_.bernoulli(
                  connection_manager_.overload_stop_accepting_requests_ref_.value())) {
            // In this one special case, do not create the filter chain. If there is a risk of memory
            // overload it is more important to avoid unnecessary allocation than to create the filters.
            filter_manager_.skipFilterChainCreation();
            connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
            sendLocalReply(Grpc::Common::isGrpcRequestHeaders(*request_headers_),
                           Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
                           StreamInfo::ResponseCodeDetails::get().Overload);
            return;
          }
         
          if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
              request_headers_->Expect()->value() == Headers::get().ExpectValues._100Continue.c_str()) {
            // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
            // and sends the 100-Continue directly to the encoder.
            chargeStats(continueHeader());
            response_encoder_->encode100ContinueHeaders(continueHeader());
            // Remove the Expect header so it won't be handled again upstream.
            request_headers_->removeExpect();
          }
         
          connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
                                                                connection_manager_.stats_.prefixStatName(),
                                                                connection_manager_.stats_.scope_);
         
          // Make sure we are getting a codec version we support.
          if (protocol == Protocol::Http10) {
            // Assume this is HTTP/1.0. This is fine for HTTP/0.9 but this code will also affect any
            // requests with non-standard version numbers (0.9, 1.3), basically anything which is not
            // HTTP/1.1.
            //
            // The protocol may have shifted in the HTTP/1.0 case so reset it.
            filter_manager_.streamInfo().protocol(protocol);
            if (!connection_manager_.config_.http1Settings().accept_http_10_) {
              // Send "Upgrade Required" if HTTP/1.0 support is not explicitly configured on.
              sendLocalReply(false, Code::UpgradeRequired, "", nullptr, absl::nullopt,
                             StreamInfo::ResponseCodeDetails::get().LowVersion);
              return;
            }
            if (!request_headers_->Host() &&
                !connection_manager_.config_.http1Settings().default_host_for_http_10_.empty()) {
              // Add a default host if configured to do so.
              request_headers_->setHost(
                  connection_manager_.config_.http1Settings().default_host_for_http_10_);
            }
          }
         
          if (!request_headers_->Host()) {
            // Require host header. For HTTP/1.1 Host has already been translated to :authority.
            sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "",
                           nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().MissingHost);
            return;
          }
         
          // Verify header sanity checks which should have been performed by the codec.
          ASSERT(HeaderUtility::requestHeadersValid(*request_headers_).has_value() == false);
         
          // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
          // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
          // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
          // is enabled on the HCM.
          if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
              request_headers_->getPathValue().empty()) {
            sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr,
                           absl::nullopt, StreamInfo::ResponseCodeDetails::get().MissingPath);
            return;
          }
         
          // Currently we only support relative paths at the application layer.
          if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
            connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
            sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr,
                           absl::nullopt, StreamInfo::ResponseCodeDetails::get().AbsolutePath);
            return;
          }
         
          // Path sanitization should happen before any path access other than the above sanity check.
          const auto action =
              ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
          // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
          // because gRPC clients do not support redirect.
          if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
              (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
               Grpc::Common::hasGrpcContentType(*request_headers_))) {
            connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
            sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "",
                           nullptr, absl::nullopt,
                           StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
            return;
          } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
            connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
            sendLocalReply(
                false, Code::TemporaryRedirect, "",
                [new_path = request_headers_->Path()->value().getStringView()](
                    Http::ResponseHeaderMap& response_headers) -> void {
                  response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
                },
                absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
            return;
          }
         
          ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
          ConnectionManagerUtility::maybeNormalizeHost(*request_headers_, connection_manager_.config_,
                                                       localPort());
         
          if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
            // Modify the downstream remote address depending on configuration and headers.
            filter_manager_.setDownstreamRemoteAddress(ConnectionManagerUtility::mutateRequestHeaders(
                *request_headers_, connection_manager_.read_callbacks_->connection(),
                connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_));
          }
          ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
         
          ASSERT(!cached_route_);
          refreshCachedRoute();
         
          if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
            filter_manager_.streamInfo().setTraceReason(
                ConnectionManagerUtility::mutateTracingRequestHeader(
                    *request_headers_, connection_manager_.runtime_, connection_manager_.config_,
                    cached_route_.value().get()));
          }
         
          filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
         
          const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
         
          // TODO if there are no filters when starting a filter iteration, the connection manager
          // should return 404. The current returns no response if there is no router filter.
          if (hasCachedRoute()) {
            // Do not allow upgrades if the route does not support it.
            if (upgrade_rejected) {
              // While downstream servers should not send upgrade payload without the upgrade being
              // accepted, err on the side of caution and refuse to process any further requests on this
              // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
              // contains a smuggled HTTP request.
              state_.saw_connection_close_ = true;
              connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
              sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "",
                             nullptr, absl::nullopt, StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
              return;
            }
            // Allow non websocket requests to go through websocket enabled routes.
          }
         
          if (hasCachedRoute()) {
            const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
            if (route_entry != nullptr && route_entry->idleTimeout()) {
              // TODO(mattklein123): Technically if the cached route changes, we should also see if the
              // route idle timeout has changed and update the value.
              idle_timeout_ms_ = route_entry->idleTimeout().value();
              response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
              if (idle_timeout_ms_.count()) {
                // If we have a route-level idle timeout but no global stream idle timeout, create a timer.
                if (stream_idle_timer_ == nullptr) {
                  stream_idle_timer_ =
                      connection_manager_.read_callbacks_->connection().dispatcher().createScaledTimer(
                          Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
                          [this]() -> void { onIdleTimeout(); });
                }
              } else if (stream_idle_timer_ != nullptr) {
                // If we had a global stream idle timeout but the route-level idle timeout is set to zero
                // (to override), we disable the idle timer.
                stream_idle_timer_->disableTimer();
                stream_idle_timer_ = nullptr;
              }
            }
          }
         
          // Check if tracing is enabled at all.
          if (connection_manager_.config_.tracingConfig()) {
            traceRequest();
          }
         
          filter_manager_.decodeHeaders(*request_headers_, end_stream);
         
          // Reset it here for both global and overridden cases.
          resetIdleTimer();
        }
         
         
        void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
                                          bool end_stream) {
          // Headers filter iteration should always start with the next filter if available.
          std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
              commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
          std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();
         
          for (; entry != decoder_filters_.end(); entry  ) {
            (*entry)->maybeEvaluateMatchTreeWithNewData(
                [&](auto& matching_data) { matching_data.onRequestHeaders(headers); });
         
            if ((*entry)->skipFilter()) {
              continue;
            }

根据http connection manager上配置的filters (envoy.cors,envoy.fault,envoy.router),一个个执行decodeHeaders

这里主要写一下和envoy.router

envoy.router

  • 在构造RouteMatcher的时候会遍历virtual_hosts下的domains,并根据通配符的位置和domain的长度分为4个map<domain_len, std::unordered_map<domain, virtualHost>, std::greater<int64_t>>
    • default_virtual_host_`domain就是一个通配符(只允许存在一个)
    • wildcard_virtual_host_suffixes_domain中通配符在开头
    • wildcard_virtual_host_prefixes_domain中通配符在结尾
    • virtual_hosts_不包含通配
代码语言:javascript复制
                    RouteMatcher::RouteMatcher(const envoy::config::route::v3::RouteConfiguration& route_config,
                                               const ConfigImpl& global_route_config,
                                               Server::Configuration::ServerFactoryContext& factory_context,
                                               ProtobufMessage::ValidationVisitor& validator, bool validate_clusters)
                        : vhost_scope_(factory_context.scope().scopeFromStatName(
                              factory_context.routerContext().virtualClusterStatNames().vhost_)) {
                      absl::optional<Upstream::ClusterManager::ClusterInfoMaps> validation_clusters;
                      if (validate_clusters) {
                        validation_clusters = factory_context.clusterManager().clusters();
                      }
                      for (const auto& virtual_host_config : route_config.virtual_hosts()) {
                        VirtualHostSharedPtr virtual_host(new VirtualHostImpl(virtual_host_config, global_route_config,
                                                                              factory_context, *vhost_scope_, validator,
                                                                              validation_clusters));
                        for (const std::string& domain_name : virtual_host_config.domains()) {
                          const std::string domain = Http::LowerCaseString(domain_name).get();
                          bool duplicate_found = false;
                          if ("*" == domain) {
                            if (default_virtual_host_) {
                              throw EnvoyException(fmt::format("Only a single wildcard domain is permitted in route {}",
                                                               route_config.name()));
                            }
                            default_virtual_host_ = virtual_host;
                          } else if (!domain.empty() && '*' == domain[0]) {
                            duplicate_found = !wildcard_virtual_host_suffixes_[domain.size() - 1]
                                                   .emplace(domain.substr(1), virtual_host)
                                                   .second;
                          } else if (!domain.empty() && '*' == domain[domain.size() - 1]) {
                            duplicate_found = !wildcard_virtual_host_prefixes_[domain.size() - 1]
                                                   .emplace(domain.substr(0, domain.size() - 1), virtual_host)
                                                   .second;
                          } else {
                            duplicate_found = !virtual_hosts_.emplace(domain, virtual_host).second;
                          }
                          if (duplicate_found) {
                            throw EnvoyException(fmt::format("Only unique values for domains are permitted. Duplicate "
                                                             "entry of domain {} in route {}",
                                                             domain, route_config.name()));
                          }
                        }
                      }
                    }
  • 按照virtual_hosts_=>wildcard_virtual_host_suffixes_=>wildcard_virtual_host_prefixes_=>default_virtual_host_的顺序查找

同时按照map的迭代顺序(domain len降序)查找最先除去通配符后能匹配到的virtualhost,如果没有直接返回 404

代码语言:javascript复制
                const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMap& headers) const {
                  // Fast path the case where we only have a default virtual host.
                  if (virtual_hosts_.empty() && wildcard_virtual_host_suffixes_.empty() &&
                      wildcard_virtual_host_prefixes_.empty()) {
                    return default_virtual_host_.get();
                  }
                 
                  // There may be no authority in early reply paths in the HTTP connection manager.
                  if (headers.Host() == nullptr) {
                    return nullptr;
                  }
                 
                  // TODO (@rshriram) Match Origin header in WebSocket
                  // request with VHost, using wildcard match
                  // Lower-case the value of the host header, as hostnames are case insensitive.
                  const std::string host = absl::AsciiStrToLower(headers.getHostValue());
                  const auto& iter = virtual_hosts_.find(host);
                  if (iter != virtual_hosts_.end()) {
                    return iter->second.get();
                  }
                  if (!wildcard_virtual_host_suffixes_.empty()) {
                    const VirtualHostImpl* vhost = findWildcardVirtualHost(
                        host, wildcard_virtual_host_suffixes_,
                        [](const std::string& h, int l) -> std::string { return h.substr(h.size() - l); });
                    if (vhost != nullptr) {
                      return vhost;
                    }
                  }
                  if (!wildcard_virtual_host_prefixes_.empty()) {
                    const VirtualHostImpl* vhost = findWildcardVirtualHost(
                        host, wildcard_virtual_host_prefixes_,
                        [](const std::string& h, int l) -> std::string { return h.substr(0, l); });
                    if (vhost != nullptr) {
                      return vhost;
                    }
                  }
                  return default_virtual_host_.get();
                }
  • 在一个virtualhost上查找对应route和cluster
    • 在通过domain匹配到virtualhost,会在那个virtualhost上匹配查找cluster,如果没匹配上,会直接返回404
    • match可以根据配置分为prefix,regex,path三种route进行匹配
    • 如果存在weighted_clusters,会根据stream_id, 和clusters的weight进行分发,stream_id本身是每个请求独立随机生成,所以weighted_clusters的权重分发可以视为随机分发
  • 没有route能匹配请求,返回 404no cluster match for URL
  • 有配置directResponseEntry,直接返回
  • route上的clustername在clustermanager上找不到对应cluster,返回配置的clusterNotFoundResponseCode
  • 当前处于maintenanceMode (和主动健康检查相关)
代码语言:javascript复制
                // See if we are supposed to immediately kill some percentage of this cluster's traffic.
                  if (cluster_->maintenanceMode()) {
                    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
                    chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
                    callbacks_->sendLocalReply(
                        Http::Code::ServiceUnavailable, "maintenance mode",
                        [modify_headers, this](Http::ResponseHeaderMap& headers) {
                          if (!config_.suppress_envoy_headers_) {
                            headers.addReference(Http::Headers::get().EnvoyOverloaded,
                                                 Http::Headers::get().EnvoyOverloadedValues.True);
                          }
                          // Note: append_cluster_info does not respect suppress_envoy_headers.
                          modify_headers(headers);
                        },
                        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
                    cluster_->stats().upstream_rq_maintenance_mode_.inc();
                    return Http::FilterHeadersStatus::StopIteration;                                                                                                                                                       
                  }
  • 调用createConnPool获取upstream conn pool
代码语言:javascript复制
                    std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
                     
                    if (!generic_conn_pool) {
                      sendNoHealthyUpstreamResponse();
                      return Http::FilterHeadersStatus::StopIteration;
                    }
  • 根据 cluster上的features配置和USE_DOWNSTREAM_PROTOCOL来确定使用http1还是http2协议向上游发送请求
代码语言:javascript复制
                    std::vector<Http::Protocol>
                    ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
                      if (downstream_protocol.has_value() &&
                          features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
                        return {downstream_protocol.value()};
                      } else if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
                        ASSERT(!(features_ & Upstream::ClusterInfo::Features::HTTP3));
                        return {Http::Protocol::Http2, Http::Protocol::Http11};
                      } else {
                        if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
                          return {Http::Protocol::Http3};
                        }
                        return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
                                                                                     : Http::Protocol::Http11};
                      }
                    }
  • ThreadLocalClusterManager上根据cluster name查询cluster
代码语言:javascript复制
                    Http::ConnectionPool::Instance*
                    ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
                        ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
                        LoadBalancerContext* context, bool peek) {
                      HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context));
                      if (!host) {
                        if (!peek) {
                          ENVOY_LOG(debug, "no healthy host for HTTP connection pool");
                          cluster_info_->stats().upstream_cx_none_healthy_.inc();
                        }
                        return nullptr;
                      }
  • 根据loadbalancer算法挑选节点(此处worker之间的负载均衡根据不同的负载均衡算法有的是独立的,比如round robin,只有同一个Worker上的才是严格的顺序)
代码语言:javascript复制
                    HostConstSharedPtr LoadBalancerBase::chooseHost(LoadBalancerContext* context) {
                      HostConstSharedPtr host;
                      const size_t max_attempts = context ? context->hostSelectionRetryCount()   1 : 1;
                      for (size_t i = 0; i < max_attempts;   i) {
                        host = chooseHostOnce(context);
                     
                        // If host selection failed or the host is accepted by the filter, return.
                        // Otherwise, try again.
                        // Note: in the future we might want to allow retrying when chooseHostOnce returns nullptr.
                        if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
                          return host;
                        }
                      }
                     
                      // If we didn't find anything, return the last host.
                      return host;
                    }
  • 根据节点和协议拿到连接池 (连接池由ThreadLocalClusterManager管理,各个Worker不共享)
  • 没有做直接503,中止解析链
代码语言:javascript复制
                     std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
                      
                     if (!generic_conn_pool) {
                       sendNoHealthyUpstreamResponse();
                       return Http::FilterHeadersStatus::StopIteration;
                     }
  • 根据配置(timeout, perTryTimeout)确定本次请求的timeout
代码语言:javascript复制
                timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
                                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
                                                         config_.respect_expected_rq_timeout_);
                 
                imeoutData timeout;
                  if (!route.usingNewTimeouts()) {
                    if (grpc_request && route.maxGrpcTimeout()) {
                      const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
                      auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
                      std::chrono::milliseconds grpc_timeout =
                          header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
                      if (route.grpcTimeoutOffset()) {
                        // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
                        // setting it to 0 means infinity and a negative timeout makes no sense.
                        const auto offset = *route.grpcTimeoutOffset();
                        if (offset < grpc_timeout) {
                          grpc_timeout -= offset;
                        }
                      }
                 
                      // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
                      if (max_grpc_timeout != std::chrono::milliseconds(0) &&
                          (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
                        grpc_timeout = max_grpc_timeout;
                      }
                      timeout.global_timeout_ = grpc_timeout;
                    } else {
                      timeout.global_timeout_ = route.timeout();
                    }
                  }
                  timeout.per_try_timeout_ = route.retryPolicy().perTryTimeout();

  • 把之前生成的trace写入request header
  • 对request做一些最终的修改,headers_to_remove``headers_to_add``host_rewrite``rewritePathHeader(路由的配置)
代码语言:javascript复制
                 route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
                                                      !config_.suppress_envoy_headers_);
  • 构造 retry和shadowing的对象
代码语言:javascript复制
                 retry_state_ = createRetryState(
                       route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, config_.runtime_,
                       config_.random_, callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());
                  
                   // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
                   // runtime keys.
                   for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
                     const auto& policy_ref = *shadow_policy;
                     if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
                       active_shadow_policies_.push_back(std::cref(policy_ref));
                     }
                   }

发送请求

发送请求部分也是在envoy.router中的逻辑

  1. 查看当前conn pool是否有空闲client
代码语言:javascript复制
        if (!ready_clients_.empty()) {
           ActiveClient& client = *ready_clients_.front();
           ENVOY_CONN_LOG(debug, "using existing connection", client);
           attachStreamToClient(client, context);
              // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
           tryCreateNewConnections();

如果存在空闲连接

  • 根据downstream request和tracing等配置构造发往upstream的请求buffer
  • 把buffer一次性移入write_buffer_, 立即触发Write Event
  • ConnectionImpl::onWriteReady随后会被触发
  • write_ buffer_的内容写入socket发送出去

如果不存在空闲连接

代码语言:javascript复制
            if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
                ConnectionPool::Cancellable* pending = newPendingStream(context);
                ENVOY_LOG(debug, "trying to create new connection");
                ENVOY_LOG(trace, fmt::format("{}", *this));
             
                auto old_capacity = connecting_stream_capacity_;
                // This must come after newPendingStream() because this function uses the
                // length of pending_streams_ to determine if a new connection is needed.
                const ConnectionResult result = tryCreateNewConnections();
                // If there is not enough connecting capacity, the only reason to not
                // increase capacity is if the connection limits are exceeded.
                ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
                              connecting_stream_capacity_ > old_capacity ||
                              result == ConnectionResult::NoConnectionRateLimited,
                          fmt::format("Failed to create expected connection: {}", *this));
                return pending;
              } else {
                ENVOY_LOG(debug, "max pending streams overflow");
                onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
                              context);
                host_->cluster().stats().upstream_rq_pending_overflow_.inc();
                return nullptr;
              }
  • 根据max_pending_requestsmax_connections判断是否可以创建新的连接(此处的指标为worker间共享),但是每个线程会向上游最少建立一条连接,也就是极端策略可能需要和工作线程数相关
  • 根据配置设置新连接的socket options, 使用dispatcher.createClientConnection创建连接上游的连接,并绑定到eventloop
  • 新建PendingRequest并加到pending_requests_头部
  • 当连接成功建立的时候,会触发ConnectionImpl::onFileEvent
  • onConnected的回调中
    • 停止connect_timer_
    • 复用存在空闲连接时的逻辑,发送请求
  1. onRequestComplete里调用maybeDoShadowing进行流量复制
代码语言:javascript复制
        ASSERT(!request->headers().getHostValue().empty());
          // Switch authority to add a shadow postfix. This allows upstream logging to make more sense.
          auto parts = StringUtil::splitToken(request->headers().getHostValue(), ":");
          ASSERT(!parts.empty() && parts.size() <= 2);
          request->headers().setHost(parts.size() == 2
                                         ? absl::StrJoin(parts, "-shadow:")
                                         : absl::StrCat(request->headers().getHostValue(), "-shadow"));
          // This is basically fire and forget. We don't handle cancelling.
          thread_local_cluster->httpAsyncClient().send(std::move(request), *this, options);
  • shadowing流量并不会返回错误
  • shadowing 流量为asynclient发送,不会阻塞downstream,timeout也为global_timeout_
  • shadowing 会修改request header里的host 和 authority 添加-shadow后缀

根据global_timeout_启动响应超时的定时器

接收响应

  1. eventloop 触发ClientConnectionImpl.ConnectionImpl上的onFileEvent的read ready事件
  2. 经过http_parser execute后触发onHeadersComplete后执行到UpstreamRequest::decodeHeaders
  3. upstream_request_->upstream_host_->outlierDelector().putHttpResponseCode写入status code,更新外部检测的状态
代码语言:javascript复制
        external_origin_sr_monitor_.incTotalReqCounter();
          if (Http::CodeUtility::is5xx(response_code)) {
            std::shared_ptr<DetectorImpl> detector = detector_.lock();
            if (!detector) {
              // It's possible for the cluster/detector to go away while we still have a host in use.
              return;
            }
            if (Http::CodeUtility::isGatewayError(response_code)) {
              if (  consecutive_gateway_failure_ ==
                  detector->runtime().snapshot().getInteger(
                      ConsecutiveGatewayFailureRuntime, detector->config().consecutiveGatewayFailure())) {
                detector->onConsecutiveGatewayFailure(host_.lock());
              }
            } else {
              consecutive_gateway_failure_ = 0;
            }
         
            if (  consecutive_5xx_ == detector->runtime().snapshot().getInteger(
                                          Consecutive5xxRuntime, detector->config().consecutive5xx())) {
              detector->onConsecutive5xx(host_.lock());
            }
          } else {
            external_origin_sr_monitor_.incSuccessReqCounter();
            consecutive_5xx_ = 0;
            consecutive_gateway_failure_ = 0;
          }
代码语言:javascript复制
        if (grpc_status.has_value()) {
          upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(grpc_to_http_status);
        } else {
          upstream_request.upstreamHost()->outlierDetector().putHttpResponseCode(response_code);
        }

根据返回结果、配置和retries_remaining_判断是否应该retry

  • 根据internal_redirect_action的配置和response来确定是否需要redirect到新的host
代码语言:javascript复制
            InternalRedirectPolicyImpl RouteEntryImplBase::buildInternalRedirectPolicy(
                const envoy::config::route::v3::RouteAction& route_config,
                ProtobufMessage::ValidationVisitor& validator, absl::string_view current_route_name) const {
              if (route_config.has_internal_redirect_policy()) {
                return InternalRedirectPolicyImpl(route_config.internal_redirect_policy(), validator,
                                                  current_route_name);
              }
              envoy::config::route::v3::InternalRedirectPolicy policy_config;
              switch (route_config.internal_redirect_action()) {
              case envoy::config::route::v3::RouteAction::HANDLE_INTERNAL_REDIRECT:
                break;
              case envoy::config::route::v3::RouteAction::PASS_THROUGH_INTERNAL_REDIRECT:
                FALLTHRU;
              default:
                return InternalRedirectPolicyImpl();
              }
              if (route_config.has_max_internal_redirects()) {
                *policy_config.mutable_max_internal_redirects() = route_config.max_internal_redirects();
              }
              return InternalRedirectPolicyImpl(policy_config, validator, current_route_name);
            }
             
              if (num_internal_redirect.value() >= policy.maxInternalRedirects()) {
                config_.stats_.passthrough_internal_redirect_too_many_redirects_.inc();
                return false;
              }

返回响应

  1. 停止request_timer, 重置idle_timer
  2. 和向upstream发送请求一样的逻辑,发送响应给downstream

阅读源码总结

  1. envoy当中各种继承,模板,组合使用的非常多,子类初始化时需要关注父类的构造函数做了什么
  2. 可以根据请求日志的信息,通过日志的顺序再到代码走一遍大体过程
  3. 善用各种调试工具,例如抓包,gdb,放开指标等,个人的经验 百分之90的问题日志 抓包 部分源码的阅读可以解决
附录:

关于重复header的rfc规范:

https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2

关于header大小写处理:

https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/header_casing

关于修改header append行为:

https://www.envoyproxy.io/docs/envoy/latest/version_history/v1.15.1

0 人点赞