前言
Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。
Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。
本篇是Envoy请求流程源码解析的第二篇,主要分享Envoy的outbound方向上篇,包含启动监听和建立连接。注:本文中所讨论的issue和pr基于21年12月。
envoy当中基于libevent进行封装了各种文件,定时器事件等操作,以及dispatch对象的分发,和延迟析构,worker启动,worker listener绑定等部分不在这里作解读,后续有空可以单独再进行分析。跳过envoy当中的事件循环模型,这里以请求触发开始。
outbound方向
filter解析
启动监听
- 通过xDS或者静态配置,获得Envoy代理的监听器信息
- 如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback
void ListenerManagerImpl::addListenerToWorker(Worker& worker,
absl::optional<uint64_t> overridden_listener,
ListenerImpl& listener,
ListenerCompletionCallback completion_callback) {
if (overridden_listener.has_value()) {
ENVOY_LOG(debug, "replacing existing listener {}", overridden_listener.value());
worker.addListener(overridden_listener, listener, [this, completion_callback](bool) -> void {
server_.dispatcher().post([this, completion_callback]() -> void {
stats_.listener_create_success_.inc();
if (completion_callback) {
completion_callback();
}
});
});
return;
}
worker.addListener(
overridden_listener, listener, [this, &listener, completion_callback](bool success) -> void {
// The add listener completion runs on the worker thread. Post back to the main thread to
// avoid locking.
server_.dispatcher().post([this, success, &listener, completion_callback]() -> void {
void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port) {
setListenSocketOptions(options);
if (bind_to_port) {
bind(address_provider_->localAddress());
}
}
ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config)
: ActiveTcpListener(
parent,
parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this,
config.bindToPort(), config.tcpBacklogSize()),
config) {}
class ActiveTcpListener : public Network::TcpListenerCallbacks,
public ActiveListenerImplBase,
public Network::BalancedConnectionHandler,
Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config);
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerPtr&& listener,
Network::ListenerConfig& config);
~ActiveTcpListener() override;
bool listenerConnectionLimitReached() const {
// TODO(tonya11en): Delegate enforcement of per-listener connection limits to overload
// manager.
return !config_->openConnections().canCreate();
}
void decNumConnections() {
ASSERT(num_listener_connections_ > 0);
--num_listener_connections_;
config_->openConnections().dec();
}
// Network::TcpListenerCallbacks
void onAccept(Network::ConnectionSocketPtr&& socket) override;
void onReject(RejectCause) override;
listener_.reset(
// libevent的base 当前对象方法 套接字的文件描述符
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v3::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",
socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
关于reuseport
- https://github.com/envoyproxy/envoy/issues/4602#issuecomment-544704931
- https://github.com/envoyproxy/envoy/issues/8794
- https://lwn.net/Articles/542629/
- https://tech.flipkart.com/linux-tcp-so-reuseport-usage-and-implementation-6bfbf642885a
多个 server socket 监听相同的端口。每个 server socket 对应一个监听线程。内核 TCP 栈接收到客户端建立连接请求(SYN)时,按 TCP 4 元组(srcIP,srcPort,destIP,destPort) hash 算法,选择一个监听线程,唤醒之。新连接绑定到被唤醒的线程。所以相对于非SO_REUSEPORT
, 连接更为平均地分布到线程中(hash 算法不是绝对平均)
envoy当中是支持在listener去设置开启这个特性,但是热重启场景时,对内核版本有一定要求(4.19-rc1)
https://www.envoyproxy.io/docs/envoy/v1.18.3/api-v3/config/listener/v3/listener.proto
验证观察
默认未开启,通过envoyfilter进行开启后,可见15001的端口被开启
代码语言:javascript复制apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
name: reuseport
namespace: testhl
spec:
workloadSelector:
labels:
app: asm-0
configPatches:
- applyTo: LISTENER
match:
context: SIDECAR_OUTBOUND
listener:
portNumber: 15001
name: "virtualOutbound"
patch:
operation: MERGE
value:
reuse_port: true
需要重启 POD
而对于没有应用reuseport
大致的平均
关于绝对的链接平衡, 可以试试 Listener 的配置connection_balance_config:exact_balance
,不过由于有锁,对高频新连接应该有一定的性能损耗。目前只适用于 TCP 监听器
Network::BalancedConnectionHandlerOptRef new_listener;
if (hand_off_restored_destination_connections_ &&
socket_->addressProvider().localAddressRestored()) {
// Find a listener associated with the original destination address.
new_listener =
listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
}
if (!rebalanced) {
Network::BalancedConnectionHandler& target_handler =
config_->connectionBalancer().pickTargetHandler(*this);
if (&target_handler != this) {
target_handler.post(std::move(socket));
return;
}
}
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);
// Create and run the filters
config_->filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);
Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {
// This is a linear operation, may need to add a map<address, listener> to improve performance.
// However, linear performance might be adequate since the number of listeners is small.
// We do not return stopped listeners.
auto listener_it =
std::find_if(listeners_.begin(), listeners_.end(),
[&address](std::pair<Network::Address::InstanceConstSharedPtr,
ConnectionHandlerImpl::ActiveListenerDetails>& p) {
return p.second.tcpListener().has_value() &&
p.second.listener_->listener() != nullptr &&
p.first->type() == Network::Address::Type::Ip && *(p.first) == address;
});
// If there is exact address match, return the corresponding listener.
if (listener_it != listeners_.end()) {
return Network::BalancedConnectionHandlerOptRef(
listener_it->second.tcpListener().value().get());
}
// Otherwise, we need to look for the wild card match, i.e., 0.0.0.0:[address_port].
// We do not return stopped listeners.
// TODO(wattli): consolidate with previous search for more efficiency.
if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.listener_wildcard_match_ip_family")) {
listener_it =
std::find_if(listeners_.begin(), listeners_.end(),
[&address](const std::pair<Network::Address::InstanceConstSharedPtr,
ConnectionHandlerImpl::ActiveListenerDetails>& p) {
建立连接
- DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback
- client向envoy发起连接,envoy的worker接收eventloop的callback, 触发 Envoy::Network::ListenerImpl::listenCallback(port: 15001)
- 15001的
useOriginalDst": true
,accept_filters_
中会带有OriginalDstFilter
- 在
OriginalDstFilter.OnAccept
中用os_syscalls.getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)
获取在iptables修改之前dst ip iptables与getsockopt
Network::Address::InstanceConstSharedPtr OriginalDstFilter::getOriginalDst(Network::Socket& sock) {
return Network::Utility::getOriginalDst(sock);
}
sockaddr_storage orig_addr;
memset(&orig_addr, 0, sizeof(orig_addr));
socklen_t addr_len = sizeof(sockaddr_storage);
int status;
if (*ipVersion == Address::IpVersion::v4) {
status = sock.getSocketOption(SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;
} else {
status = sock.getSocketOption(SOL_IPV6, IP6T_SO_ORIGINAL_DST, &orig_addr, &addr_len).rc_;
}
if (status != 0) {
return nullptr;
}
return Address::addressFromSockAddr(orig_addr, 0, true /* default for v6 constructor */);
- 在newconnection当中,还会通过 getBalancedHandlerByAddress寻找到实际的虚拟listener
void ActiveTcpSocket::newConnection() {
connected_ = true;
// Check if the socket may need to be redirected to another listener.
Network::BalancedConnectionHandlerOptRef new_listener;
if (hand_off_restored_destination_connections_ &&
socket_->addressProvider().localAddressRestored()) {
// Find a listener associated with the original destination address.
new_listener =
listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
}
- 通过
ConnectionHandlerImpl::findActiveListenerByTag
Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByAddress(const Network::Address::Instance& address) {
// This is a linear operation, may need to add a map<address, listener> to improve performance.
// However, linear performance might be adequate since the number of listeners is small.
// We do not return stopped listeners.
auto listener_it =
std::find_if(listeners_.begin(), listeners_.end(),
[&address](std::pair<Network::Address::InstanceConstSharedPtr,
ConnectionHandlerImpl::ActiveListenerDetails>& p) {
return p.second.tcpListener().has_value() &&
p.second.listener_->listener() != nullptr &&
p.first->type() == Network::Address::Type::Ip && *(p.first) == address;
});
// If there is exact address match, return the corresponding listener.
if (listener_it != listeners_.end()) {
return Network::BalancedConnectionHandlerOptRef(
listener_it->second.tcpListener().value().get());
}
查到addr对应的Listener
- 先查找
Listener.IP==addr.ip && Listener.Port==addr.port
的Listener - 再查找
Listener.IP==0.0.0.0 && Listener.Port==addr.port
的Listener (对于tcp服务,ip会有值,对于http服务,ip为4个0)
dispatcher.createServerConnection
传入accept到的fd 创建Server连接对象ConnectionImpl
, 并把onFileEvent
注册到eventloop,等待读写事件的到来,因为socket是由一个non-blocking listening socket创建而来,所以也是non-blocking- 且注册的触发方式为epoll的边缘触发
auto server_conn_ptr = parent_.dispatcher().createServerConnection(
std::move(socket), std::move(transport_socket), *stream_info);
Network::ServerConnectionPtr
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
Network::TransportSocketPtr&& transport_socket,
StreamInfo::StreamInfo& stream_info) {
ASSERT(isThreadSafe());
return std::make_unique<Network::ServerConnectionImpl>(
*this, std::move(socket), std::move(transport_socket), stream_info, true);
}
class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
public:
ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info,
bool connected);
// ServerConnection impl
void setTransportSocketConnectTimeout(std::chrono::milliseconds timeout) override;
void raiseEvent(ConnectionEvent event) override;
private:
void onTransportSocketConnectTimeout();
bool transport_connect_pending_{true};
// Implements a timeout for the transport socket signaling connection. The timer is enabled by a
// call to setTransportSocketConnectTimeout and is reset when the connection is established.
Event::TimerPtr transport_socket_connect_timer_;
};
Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
socket_->ioHandle().initializeFileEvent(
dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
Event::FileReadyType::Read | Event::FileReadyType::Write);
transport_socket_->setTransportSocketCallbacks(*this);
constexpr FileTriggerType determinePlatformPreferredEventType() {
#if defined(WIN32) || defined(FORCE_LEVEL_EVENTS)
return FileTriggerType::EmulatedEdge;
#else
return FileTriggerType::Edge;
#endif
}
static constexpr FileTriggerType PlatformDefaultTriggerType = determinePlatformPreferredEventType();
- http的listener里filters为
envoy.http_connection_manager
,buildFilterChain
里会把HTTP::ConnectionManagerImpl
加入到upstream_filters_
(list)中,这样在请求数据到达的时候,就可以使用http_connection_manager的on_read
方法
void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
ASSERT(connection_.state() == Connection::State::Open);
ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
filter->initializeReadFilterCallbacks(*new_filter);
LinkedList::moveIntoListBack(std::move(new_filter), upstream_filters_);
}
CodecClient::CodecClient(Type type, Network::ClientConnectionPtr&& connection,
Upstream::HostDescriptionConstSharedPtr host,
Event::Dispatcher& dispatcher)
: type_(type), host_(host), connection_(std::move(connection)),
idle_timeout_(host_->cluster().idleTimeout()) {
if (type_ != Type::HTTP3) {
// Make sure upstream connections process data and then the FIN, rather than processing
// TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
// details)
connection_->detectEarlyCloseWhenReadDisabled(false);
}
connection_->addConnectionCallbacks(*this);
connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});
connection_->noDelay(true);
- 当连接刚刚加入eventloop的时候, Write Event会被立即触发,但因为
write_buffer_
没有数据,所以不会写入任何数据
void CodecClient::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::Connected) {
ENVOY_CONN_LOG(debug, "connected", *connection_);
connection_->streamInfo().setDownstreamSslConnection(connection_->ssl());
connected_ = true;
}
if (event == Network::ConnectionEvent::RemoteClose) {
remote_closed_ = true;
}
// HTTP/1 can signal end of response by disconnecting. We need to handle that case.
if (type_ == Type::HTTP1 && event == Network::ConnectionEvent::RemoteClose &&
!active_requests_.empty()) {
Buffer::OwnedImpl empty;
onData(empty);
}
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
ENVOY_CONN_LOG(debug, "disconnect. resetting {} pending requests", *connection_,
active_requests_.size());
disableIdleTimer();
idle_timer_.reset();
StreamResetReason reason = StreamResetReason::ConnectionFailure;
if (connected_) {
reason = StreamResetReason::ConnectionTermination;
if (protocol_error_) {
if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.return_502_for_upstream_protocol_errors")) {
reason = StreamResetReason::ProtocolError;
connection_->streamInfo().setResponseFlag(
StreamInfo::ResponseFlag::UpstreamProtocolError);
}
}
}
while (!active_requests_.empty()) {
// Fake resetting all active streams so that reset() callbacks get invoked.
active_requests_.front()->encoder_->getStream().resetStream(reason);
}
}
}
相关阅读
Envoy请求流程源码解析(一)|流量劫持