Envoy:httpfilter相关代码阅读

2023-10-30 16:05:26 浏览数 (2)

本篇文章是envoy httpfilter相关代码阅读的整理和总结,笔者试图通过这篇文章将http filter在envoy内部的管控讲清楚,并且将request和response是如何使用这部分 http filter功能的流程介绍清楚。

httpfilter是netfilter中的一种filter,因为envoy对http支持的细粒度管控很全面,所以将httpfilter又做了一层只是针对http协议的filter chain的管控处理逻辑。

httpfilter 在envoy中采用的是生产者和消费者的处理模式,通过配置文件或者xds协议的配置数据将http filter相关的信息,存放到固定的列表中,在有消息request和response到来的时候,通过异步事件触发对应的响应函数,进而从这些列表中取出对应的filter,依次执行filter的功能,达到使用http filter的目的。

一、生产者部分的逻辑:

在envoy初始化的时候,或者更新httpfilter配置的时候,通过httpconnectionManagerconfig依次将httpfilter存放到filter_factories中。

逻辑代码如下所示:

代码语言:javascript复制
Network::FilterFactoryCb
HttpConnectionManagerFilterConfigFactory::createFilterFactoryFromProtoTyped()
---→
std::shared_ptr<HttpConnectionManagerConfig>  Utility::createConfig()
---→ 
std::make_shared<HttpConnectionManagerConfig>() { 
  ......
  // 操作的是http_filters
  const auto& filters = config.http_filters(); 
  DependencyManager dependency_manager;
  for (int32_t i = 0; i < filters.size(); i  ) {
     processFilter(filters[i], i, "http", "http", i == filters.size() - 1, filter_factories_,
           dependency_manager);
  }
  ......
}
----→ 
void HttpConnectionManagerConfig::processFilter() {
  ......
  auto* factory =
  Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(
    proto_config, proto_config.is_optional());
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
     proto_config, context_.messageValidationVisitor(), *factory);
  Http::FilterFactoryCb callback =
  factory->createFilterFactoryFromProto(*message, stats_prefix_, context_);
  ......
  // 这里将filterfactorycb存放到filter_factories中
  filter_factories.push_back(std::move(filter_config_provider));
}

二、消费者部分的逻辑

事件响应函数,在触发onFileEvent之后,有一个环节会调用createFilterChain()去消费filter_factories中的filterfactorycb函数,并通过这些已经注册好的cb函数,将http filter添加到decoder_filter 或者encoder_filter中。

代码语言:javascript复制
ConnectionImpl::onFileEvent()-→
.......
------>
Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
......
  active_request.request_decoder_->decodeHeaders(std::move(headers), false);
......
}
----→ 
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
......
  const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
......
}
---→ 
bool FilterManager::createFilterChain() {
......
  if (upgrade != nullptr) {
    const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
    if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
        upgrade_map, *this)) {
        filter_manager_callbacks_.upgradeFilterChainCreated();
        return true;
    } else {
        upgrade_rejected = true;
        // Fall through to the default filter chain. The function calling this
        // will send a local reply indicating that the upgrade failed.
   }
 }
filter_chain_factory_.createFilterChain(*this);
......
}
----→ 
HttpConnectionManagerConfig::createFilterChain()
HttpConnectionManagerConfig::createUpgradeFilterChain() 
----→ 
HttpConnectionManagerConfig::createFilterChainForFactories() {
 ......
   for (const auto& filter_config_provider : filter_factories) {
      auto config = filter_config_provider->config();
      if (config.has_value()) {
       // 这里对应的是http_filter 创建的工厂调用函数里面的FilterFactoryCb函数
          config.value()(callbacks);
          continue;
       }
......
}
-----→
以BandwidthLimitFilterConfig 为例子,config.value()(callbacks) 对应的是:
[filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { // 这里的callbacks对应的是fiter_manager
      callbacks.addStreamFilter(std::make_shared<BandwidthLimiter>(filter_config))
} //callback对应的是fiter_manager,所以这里调用的是下面的函数:
----→ 
void addStreamFilter(StreamFilterSharedPtr filter) override {
    addStreamDecoderFilterWorker(filter, nullptr, true);
    addStreamEncoderFilterWorker(filter, nullptr, true);
    StreamDecoderFilter* decoder_filter = filter.get();
    filters_.push_back(decoder_filter);
}
---→
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_); 或者
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);或者同时添加进去。

三、encoder_filter和decoder_filter的消费逻辑

这里的入口都是通过libevent里面的消息响应事件,关联到读和写的相应函数,最终从这两个列表里面依次取出对应的http filter进而执行相应的filter里面的功能,达到使用filter chain的目的。

消费encoder_filters_ 的流程:

代码语言:javascript复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→ 
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream) 
void encodeData(Buffer::Instance& data, bool end_stream) 
void encodeTrailers(ResponseTrailerMap& trailers)
----→ 
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers, bool end_stream)  
void encodeData(Buffer::Instance& data, bool end_stream)
void encodeTrailers(ResponseTrailerMapPtr&& trailers)
----→ 会去遍历encoder_filters去依次执行对应的
encodeHeaders()encodeData()encodeTrailers()函数

消费decoder_filters_的流程:

代码语言:javascript复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→  
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
      bool end_stream)
void decodeData(Buffer::Instance& data, bool end_stream)
void decodeTrailers(RequestTrailerMapPtr&& trailers)
---→ 
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream) 
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
bool end_stream,
FilterIterationStartState filter_iteration_start_state)
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers)
--→ 会去遍历decoder_filters去依次执行对应的
decodeHeaders()decodeData()decodeTrailers()函数

补充代码信息:下面是onMessageCompleteBase函数在ServerConnectionImpl和ClientConnectionImpl中的详细代码信息

代码语言:javascript复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
  ASSERT(!handling_upgrade_);
  if (active_request_.has_value()) {
    auto& active_request = active_request_.value();
    if (active_request.request_decoder_) {
      active_request.response_encoder_.readDisable(true);
    }
    active_request.remote_complete_ = true;
    if (deferred_end_stream_headers_) {
      active_request.request_decoder_->decodeHeaders(
          std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      active_request.request_decoder_->decodeTrailers(
          std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      active_request.request_decoder_->decodeData(buffer, true);
    }
    // Reset to ensure no information from one requests persists to the next.
    headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
  }
  // Always pause the parser so that the calling code can process 1 request at a time and apply
  // back pressure. However this means that the calling code needs to detect if there is more data
  // in the buffer and dispatch it again.
  return parser_->pause();
}
  
ParserStatus ClientConnectionImpl::onMessageCompleteBase() {
  ENVOY_CONN_LOG(trace, "message complete", connection_);
  if (ignore_message_complete_for_1xx_) {
    ignore_message_complete_for_1xx_ = false;
    return ParserStatus::Success;
  }
  if (pending_response_.has_value()) {
    ASSERT(!pending_response_done_);
    // After calling decodeData() with end stream set to true, we should no longer be able to reset.
    PendingResponse& response = pending_response_.value();
    // Encoder is used as part of decode* calls later in this function so pending_response_ can not
    // be reset just yet. Preserve the state in pending_response_done_ instead.
    pending_response_done_ = true;
  
    if (deferred_end_stream_headers_) {
      response.decoder_->decodeHeaders(
          std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      response.decoder_->decodeTrailers(
          std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      response.decoder_->decodeData(buffer, true);
    }
  
    // Reset to ensure no information from one requests persists to the next.
    pending_response_.reset();
    headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);
  }
  // Pause the parser after a response is complete. Any remaining data indicates an error.
  return parser_->pause();
}

参考文档:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/http/http_filters#arch-overview-http-filters

0 人点赞