0%

关于Envoy的知识

开发环境搭建

基于 Ubuntu 18.04 进行开发,一下的所有篇幅的Envoy版本基于 1.14.1

基础工具下载

安装 Bazel

1
2
sudo wget -O /usr/local/bin/bazel https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-amd64
sudo chmod +x /usr/local/bin/bazel

安装基础依赖

1
sudo apt-get install libtool cmake automake autoconf make ninja-build curl unzip virtualenv

Clang 构建环境 (可选)

llvm 下载安装,9.0 版本比较兼容

1
2
bazel/setup_clang.sh <PATH_TO_EXTRACTED_CLANG_LLVM>
echo "build --config=clang" >> user.bazelrc

使用 bazel构建DEBUG版本

1
bazel build -c dbg --spawn_strategy=standalone  //source/exe:envoy-static

集成 Clion

将 Bazel 转化为 Cmake

1
2
3
4
git clone https://github.com/lizan/bazel-cmakelists.git <PATH>
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/...
// 如果不需要构建可以
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/... --skip_build
  • 打开Clion导入Cmake项目即可,Clion的 Bazel插件 不能够针对Envoy良好的工作。
  • 设置Debug启动即可。

开发环境搭建参考

Envoy 基础

Libevent

1
Envoy is an L7 proxy and communication bus designed for large modern service oriented architectures. The project was born out of the belief.

如同在官网的介绍一样,Envoy是一个高性能的代理服务软件,支持 L4 和 L7 的代理能力。但是Envoy也并非是一个完全重复造轮子的产品,Envoy的底层与操作系统交互网络部分采用的是 libevent

libevent 是一个轻量级的网络事件库,提供了对于底层 Socket 的读写等事件的回调处理能力。

从上文中我们自然可以很明白的看懂,我们处理 lister/read/write/signal 这四个事件进行了监听处理。

Libevent in Envoy

续上文,libevent在Envoy中的定位也是偏底层的,通过不同事件获得的数据包会真实的扭转至 Envoy 的系统内部进行处理,我们举一个常见的读取的例子。但是在讲这个之前,我们先来看看 Envoy 的代码的入口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main(int argc, char** argv) {
std::unique_ptr<Envoy::MainCommon> main_common;
try {
main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
} catch (const Envoy::NoServingException& e) {
return EXIT_SUCCESS;
} catch (const Envoy::MalformedArgvException& e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
} catch (const Envoy::EnvoyException& e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}

抛开那些兼容系统的代码,我们可以发现和绝大多数的系统设计一样,我们有一个最底层的设计称之为MainCommon,通过一顿类型跳转大法,我们可以定位到真正的启动位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void InstanceImpl::run() {
// RunHelper exists primarily to facilitate how we respond to early shutdown during
// startup (see RunHelperTest in server_test.cc).
const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
access_log_manager_, init_manager_, overloadManager(), [this] {
notifyCallbacksForStage(Stage::PostInit);
startWorkers(); // ➁
});

// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog =
guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(), "main_thread");
watchdog->startWatchdog(*dispatcher_);
dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
dispatcher_->run(Event::Dispatcher::RunType::Block); ➀
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();

terminate();
}

在 ➀ 处四周的代码暂且不管,实际上我们发现我们最终的主循环是在此处。在一顿跳转之后,我们终于拨开了层层外衣,看到了 Libevent 的部分。在 ➁ 处参考 线程模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void LibeventScheduler::run(Dispatcher::RunType mode) {
int flag = 0;
switch (mode) {
case Dispatcher::RunType::NonBlock:
flag = EVLOOP_NONBLOCK;
break;
case Dispatcher::RunType::Block:
// The default flags have 'block' behavior. See
// http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
break;
case Dispatcher::RunType::RunUntilExit:
flag = EVLOOP_NO_EXIT_ON_EMPTY;
break;
}
event_base_loop(libevent_.get(), flag); ➀
}

➀ 处理刚好即是我们最终构建的 Libevent 的Loop循环,

1
int event_base_loop(struct event_base *, int);

当 event_base 内部不存在任何注册事件之后,才会停止工作。那下一步,我们的目标是找到,我们何时何地的为这个 event_base 创建了各类事件的?

不难发现,经过我们的细心勘察,我们可以在 ListenerImpl::setupServerSocket 中发现监听端口的逻辑:

1
2
3
4
void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
}

这一点和 Netty 等网络框架不一样,Libevent 是一个全局性的事件监听库,他可以监听不同的端口,回忆一下,Socket 是不是区分为 Server SocketClient Socket ,一点没错,这里做的事情,是创建一个 Server Socket 并且监听他的 listenCallback 也就创建新连接时的事件。

ListenCallback 函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);

// Create the IoSocketHandleImpl for the fd here.
IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);

// Get the local address from the new socket if the listener is listening on IP ANY
// (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
const Address::InstanceConstSharedPtr& local_address =
listener->local_address_ ? listener->local_address_
: listener->getLocalAddress(io_handle->fd());
listener->cb_.onAccept(
std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
}

在最后一行就可以清晰的发现,当我们的 ServerSocket 获得一个新的客户端连接的时候就会将这个文件句柄 fd

编者注: 这里的 FD 一切皆文件的思想,并且这里并没有使用 Libevent 的 Read 回调,所以这里在建立的时候,这个FD已经转交给 Envoy 的系统。

当我们继续看下去,我们在真实的接受网络请求的时候此处的回调函数执行逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker(
Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced) {
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);

// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket); ➀
active_socket->continueFilterChain(true);

// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->startTimer();
active_socket->moveIntoListBack(std::move(active_socket), sockets_); ➁
}
}

➀ 在这里为 Socket 创建了 FilterChain。
➁ 将 socket 压入待处理的队列,从这里我们可以大胆的假设:对于已经和服务器建立起连接的Socket,Envoy会采用和Netty类型的 Reactor 模式类似的方式进行线程管理。也就是接受线程就是 Listener,真正的Worker会从这里的List中获取工作的Socket。

额外的资料 :

又在一波骚操作之下,我们发来到了创建新连接的地方(其实更像是分配新连接) ConnectionHandlerImpl::ActiveTcpListener::newConnection。但是在这里我们依然不能发现Read的逻辑,献出我们的人肉Debug大法,我们在读取数据的 ConnectionImpl::onRead 放下断点这一大杀器。

JgPwQS.png

我们依然发现起始点仍然是 event_base_loop,那我们的疑问自然就变成了,究竟是在哪里进行事件注册的,因为我们知道注册进来的是 event_base 对象,我们通过 usage 的方式进行查找,找到了一个可疑点 FileEventImpl::assignEvents

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(base != nullptr);
event_assign(
&raw_event_, base, fd_,
EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
(events & FileReadyType::Read ? EV_READ : 0) |
(events & FileReadyType::Write ? EV_WRITE : 0) |
(events & FileReadyType::Closed ? EV_CLOSED : 0),
[](evutil_socket_t, short what, void* arg) -> void {
auto* event = static_cast<FileEventImpl*>(arg);
uint32_t events = 0;
// ... skip something
},
this);
}

看起来,在这里进行了事件的分配,我们打上断点,再走一遍。从调用的栈中,我们可以清晰的发现:

JgiIc8.png

OnAccpet -> newConnection -> AssignEvents 这样的一个逻辑,只是因为整个过程的栈太深,我们不能轻易的发现,事件的注册是在创建客户端的 socket 的时候创建的。剩下来的注册逻辑是 libevent 部分,不做深入。

线程模式

首先可以阅读下 Envoy threading model

EnvoyLibevent 充当了一个更底层的lib依赖,如同我们写 SpringWeb 时候的 Tomcat,对于 Socket 的读写等事件是委托给 Libevnet 的。第一步是为我们的 Event_base 创建了 Listen 的监听事件。之后,在建立起连接的时候,Envoy增加了对于 Socket 的Read/Write 事件的监听。

其实如果在 AssginEvents 处设置断点再重新启动,第一个注册的事件是对于配置文件的读事件的事件(为的是热更新配置),之后是 DNS 的事件注册,之后我们启动了Envoy服务,再此之后,我们要初始化配置文件中的各种 Listener,但是值得注意的是,LibeventScheduler::run 的是在每一个 Worker 中都存在的。

Netty 的处理不同,这里的 Listener 也是分布在所有的 Thread 上的。也就是在接受到一个新的请求的时候,是随机在 Worker 中选择一个进行处理的。

Chain in Envoy

在上文中,我们已经知道,在 ClientSocket 建立的 Listen 事件中,我们就将此 SocketRead/Write 事件注册到 Event_Base,我们在注册的事件中,我们轻而易举的定位到 ConnectionImpl::onRead 此处是读取逻辑处。

请求处理链

1
2
3
4
5
6
7
8
9
10
11
12
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (!read_enabled_ || inDelayedClose()) {
return;
}
ASSERT(ioHandle().isOpen());

if (read_buffer_size == 0 && !read_end_stream_) {
return;
}

filter_manager_.onRead();
}

在最后一行,我们就进入了整个流程处理的开始 FilterManager 开始处理的逻辑。在之后的几层跳转中,我们发现了处理逻辑的核心所在:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,

std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
entry = upstream_filters_.begin();
} else {
entry = std::next(filter->entry());
}

for (; entry != upstream_filters_.end(); entry++) {
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
FilterStatus status = (*entry)->filter_->onNewConnection(); ➀
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}

StreamBuffer read_buffer = buffer_source.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); ➁
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
}
}

➀ 对于 Envoy 来说,也规定了一系列 Fitler 的抽象限定比如 Network::ReadFilter 就定义了 ReadFilter 一系列虚函数,我们可以看到在第一个Filter处,我们调用了 onNewConnection 这里必然是一个生命周期的回调函数,至于具体的内容我们需要去查看具体的实现类进行判断。

➁ 从这里我们发现真实的读取了数据的逻辑。从这里因为本文重点是在 Http协议的处理,我们这里的 ReadFilter 也会查看 Http::ConnectionManagerImpl 这个类的实现机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
etwork::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {


bool redispatch;
do {
redispatch = false;

try {
codec_->dispatch(data); //将此数据分发出去
}
if (codec_->protocol() < Protocol::Http2) {
if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
data.length() > 0 && streams_.empty()) {
redispatch = true;
}

if (!streams_.empty() && streams_.front()->state_.remote_complete_) {
read_callbacks_->connection().readDisable(true);
}
}
} while (redispatch);

if (!read_callbacks_->connection().streamInfo().protocol()) {
read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
}

return Network::FilterStatus::StopIteration; //返回
}

在这里的实现过程中,我们至少发现了envoy 的 Filter 不是如同Java 的Fitler 是层层调用的,每一层只会做自己的事情,因为从代码内,我们可以发现这些因为如果正常处理的过程,我们应该会走完所有的 Fitler, 那基于这样的逻辑,我们可以判断出来,在这个 codec_ 的逻辑里面,我们应该只是将 HTTP 进行解码,然而在实际的断点之中,其实在这个 codec 之中有一个优化是当我们 Pase 完成HTTP请求的时候,此处就会直接进行逻辑处理,具体的会跳转至如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void ServerConnectionImpl::onMessageComplete() {
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
active_request.remote_complete_ = true;
if (deferred_end_stream_headers_) {
active_request.request_decoder_->decodeHeaders(
std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true); ➀
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
active_request.request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
active_request.request_decoder_->decodeData(buffer, true);
}

// Reset to ensure no information from one requests persists to the next.
headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
}

http_parser_pause(&parser_, 1);
}

在 ➀ 处会进入 Route 阶段(其实笔者觉得这里的设计是不太清晰,导致在阅读的过程中,不得不借助于Debug工具),在这里最终进入 ConfigImpl::route 逻辑。

1
2
3
4
5
6
7
8
9
10
RouteConstSharedPtr RouteMatcher::route(const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info,
uint64_t random_value) const {
const VirtualHostImpl* virtual_host = findVirtualHost(headers);
if (virtual_host) {
return virtual_host->getRouteFromEntries(headers, stream_info, random_value);
} else {
return nullptr;
}
}

通过是否返回命中的 Route 来判断下一步的处理逻辑。在之后的逻辑中,我们随着断点来到了

1
2
3
4
5
6
7
8
9
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {

// 略....

decodeHeaders(nullptr, *request_headers_, end_stream);

// Reset it here for both global and overridden cases.
resetIdleTimer();
}

ConnectionManagerImpl::ActiveStream::decodeHeaders 代码中有大量的处理 httpheader的逻辑,这里不做展开,我们进入,最重要的 decodeHeaders 的函数继续我们的探索,在这里的逻辑中,我们可以发现这里一个非常复杂的逻辑处理,我们慢慢的看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {

// 判断是否拥有 Route,如果没有就直接Return了
route_ = callbacks_->route();
if (!route_) {
config_.stats_.no_route_.inc();
ENVOY_STREAM_LOG(debug, "no cluster match for URL '{}'", *callbacks_,
headers.Path()->value().getStringView());

callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RouteNotFound);
return Http::FilterHeadersStatus::StopIteration;
}


// 找到这个请求的路由对象
route_entry_ = route_->routeEntry();

// 拿到配置的Cluster,其实也就是目标的地址
cluster_ = cluster->info();



// 从上游获得连接为了获得 host
const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
Http::ConnectionPool::Instance* http_pool = getHttpConnPool();
Upstream::HostDescriptionConstSharedPtr host;

ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);

// 将需要处理的数据压栈
modify_headers_ = modify_headers;

UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*http_pool));
upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_);
upstream_requests_.front()->encodeHeaders(end_stream);
if (end_stream) {
onRequestComplete();
}

return Http::FilterHeadersStatus::StopIteration;
}

在 upstream_requests_.front()->encodeHeaders(end_stream) 这一步别有洞天

1
2
3
4
5
6
void UpstreamRequest::encodeHeaders(bool end_stream) {
ASSERT(!encode_complete_);
encode_complete_ = end_stream;

conn_pool_->newStream(this); ➀
}

➀ 在此处我们创建了一个向下游的 connection。随着我们代码的深入探索:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ConnPoolImplBase::ActiveClient::ActiveClient(ConnPoolImplBase& parent,
uint64_t lifetime_request_limit,
uint64_t concurrent_request_limit)
: parent_(parent), remaining_requests_(translateZeroToUnlimited(lifetime_request_limit)),
concurrent_request_limit_(translateZeroToUnlimited(concurrent_request_limit)),
connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })) {
Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(
parent_.dispatcher_, parent_.socket_options_, parent_.transport_socket_options_);
real_host_description_ = data.host_description_;
codec_client_ = parent_.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);

conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host_->cluster().stats().upstream_cx_connect_ms_, parent_.dispatcher_.timeSource());
conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host_->cluster().stats().upstream_cx_length_ms_, parent_.dispatcher_.timeSource());
connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout());
}

我们可以发现在这里我才真正的去创建一个下游的连接。但是我们知道,对于一个高性能的WebServer,我们对于下游的访问也是不能进行 Blocking 的,因此,我们常见的处理能力应该是将下游的请求建立起来之后进行挂起,我们等待下游的返回再进行处理,因此在后面我们可以发现:

1
2
3
4
5
6
7
8
9
10
11
12
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket,
StreamInfo::StreamInfo& stream_info, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
Event::FileReadyType::Read | Event::FileReadyType::Write);

transport_socket_->setTransportSocketCallbacks(*this);
}

在创建下游的链接的时候,我们就将下游回复数据的 ReadWrite 事件注册到当前的 Worker 线程的 Event_Base 中。到这里我们先来看看前半段的整个流程:

直至上面的内容,我们已经走过了接收到请求到创建到下游的请求的大部分流程,在此之后还有一些其他的事务性的需要处理,比如响应超时等,不做太多的深入,我们进入下一个环节:响应数据的处理链路。

响应数据处理链路

对于响应的数据,我们也知道在我们在执行完成之后,我们将后续的对 BackendSteam 的读写事件载入了 EventBase 我们看看后续这些数据如何处理,入口依然是 OnRead() 函数,之后的逻辑和之前类似,直到 FilterManagerImpl::onRead 之后调用的具体实现类是 CodecClient 相对会毕竟的简单,因为响应体我们不需要再进行什么路由设置等操作,然后当我们读取完成之后,就进入了 ConnectionImpl::onMessageCompleteBase 函数,进入了结束流程,我们就来看看,当我们从响应流里面读取完成所有的数据之后,又是如何响应最初的调用者的。然后就进入了 ClientConnectionImpl::onMessageComplete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void ClientConnectionImpl::onMessageComplete() {
ENVOY_CONN_LOG(trace, "message complete", connection_);
if (pending_response_.has_value()) { // 当我们有挂起的 Response 的时候才处理
if (connection_.state() == Network::Connection::State::Open) {
while (!connection_.readEnabled()) {
connection_.readDisable(false); // 让链接禁读,因为要写入了
}
}

if (deferred_end_stream_headers_) {
response.decoder_->decodeHeaders(
std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
response.decoder_->decodeTrailers(
std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
response.decoder_->decodeData(buffer, true); //写入数据
}

// Reset to ensure no information from one requests persists to the next.
pending_response_.reset();
headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);
}
}

在响应的过程中,调用了 decodeData 最终回归到 ConnectionMangerImpl中,然在一系列的操作中,我们来到了最终写入的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
ASSERT(!end_stream || enable_half_close_);

if (write_end_stream_) {
ASSERT(data.length() == 0 && end_stream);

return;
}

if (through_filter_chain) {
current_write_buffer_ = &data;
current_write_end_stream_ = end_stream;
FilterStatus status = filter_manager_.onWrite(); // ➀ 这里进入了写入流的 FilterChain
current_write_buffer_ = nullptr;

if (FilterStatus::StopIteration == status) { // 值得注意的是这里是可以提前终结
return;
}
}

write_end_stream_ = end_stream;
if (data.length() > 0 || end_stream) {
ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
write_buffer_->move(data); // ➁ 将数据流写入
if (!connecting_) {
ASSERT(file_event_ != nullptr, "ConnectionImpl file event was unexpectedly reset");
file_event_->activate(Event::FileReadyType::Write);
}
}
}

在我们的所有的数据写入完成之后,进入了最终的 Cleanup 的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void ConnectionImpl::closeSocket(ConnectionEvent close_type) {

ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
transport_socket_->closeSocket(close_type);

// Drain input and output buffers.
updateReadBufferStats(0, 0);
updateWriteBufferStats(0, 0);

write_buffer_->drain(write_buffer_->length());

connection_stats_.reset();

file_event_.reset();

socket_->close();

raiseEvent(close_type);
}

到这里算是结束了一个段落,我们将整个过程整理一下:

Envoy Extend

从上文分析中,我们已经算是通读了代码的主要实现部分,剩下来,我们看Envoy给我们预留的扩展的部分。Extending Envoy for custom use cases,从配置文件中,我们也可以获得一些灵感:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 127.0.0.1, port_value: 10000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
stat_prefix: ingress_http
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: some_service }
http_filters:
- name: envoy.filters.http.router

我们可以发现,对于 Envoy 来说最为主要的部分是 FilterChains,虽然我们并没有写过CPP,但是从思想相近来看,我们应该是可以拓展 Filter 的实现,然后注册到这个 FilterChians 中,幸好对于官方来说,提供一个比较详尽的 DEMO。从中可以看出,我们的核心是继承已于的Fitler的Interface然后实现之:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
namespace Envoy {
namespace Filter {

/**
* Implementation of a basic echo filter.
*/
class Echo2 : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter> {
public:
// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
}

private:
Network::ReadFilterCallbacks* read_callbacks_{};
};

} // namespace Filter
} // namespace Envoy

使用起来也比较的简单

1
2
3
4
5
6
7
8
9
10
11
12
13
static_resources:
clusters:
name: cluster_0
connect_timeout: 0.25s
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 0

但是在这背后,这工作的机制是如何的呢?我们还是需要从源码中来。现在已经有的 Extensions 实现可以参考此处 extensions

从上文代码中,我们可以很容易的定位到初始化 FilterChain 的逻辑在 ListenerImpl::createNetworkFilterChain

1
2
3
4
5
bool ListenerImpl::createNetworkFilterChain(
Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>& filter_factories) {
return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}

可见是一个很标准的工厂模式,根据不同的工厂我们会创建不同的Filter实例,在创建完成之后如果是 ReaderFliter 就直接加到 upsteam 的底部,如果是 WriterFliter 则会增加到 downsteam 的底部。代码也较为简单,如下图。

1
2
3
4
5
6
void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
ASSERT(connection_.state() == Connection::State::Open);
ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
filter->initializeReadFilterCallbacks(*new_filter);
new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_);
}

那问题就变成了我们的 Filter_Factory 从而来,我们从虚表中发现有两种实现 Admin & Impl,如果度过官网文档的同学应该可以猜到Admin的实现是基于动态请求的。我还是重启下,从头来跟踪下流程,初始化FilterFactory 最早可以定位到 ListenerManagerImpl::addOrUpdateListener 和配置文件的层级类似,我们从首先创建的是Listerner,在创建 Listener 的过程中我们创建 FilterChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
filter_chain_manager_.addFilterChain(config.filter_chains(), builder, filter_chain_manager_);

void FilterChainManagerImpl::addFilterChain(
absl::Span<const envoy::config::listener::v3::FilterChain* const> filter_chain_span,
FilterChainFactoryBuilder& filter_chain_factory_builder,
FilterChainFactoryContextCreator& context_creator) {
Cleanup cleanup([this]() { origin_ = absl::nullopt; });
std::unordered_set<envoy::config::listener::v3::FilterChainMatch, MessageUtil, MessageUtil>
filter_chains;
uint32_t new_filter_chain_size = 0;
for (const auto& filter_chain : filter_chain_span) {
const auto& filter_chain_match = filter_chain->filter_chain_match();
if (!filter_chain_match.address_suffix().empty() || filter_chain_match.has_suffix_len()) {
throw EnvoyException(fmt::format("error adding listener '{}': contains filter chains with "
"unimplemented fields",
address_->asString()));
}
if (filter_chains.find(filter_chain_match) != filter_chains.end()) {
throw EnvoyException(fmt::format("error adding listener '{}': multiple filter chains with "
"the same matching rules are defined",
address_->asString()));
}
filter_chains.insert(filter_chain_match);

// Validate IP addresses.
std::vector<std::string> destination_ips;
destination_ips.reserve(filter_chain_match.prefix_ranges().size());
for (const auto& destination_ip : filter_chain_match.prefix_ranges()) {
const auto& cidr_range = Network::Address::CidrRange::create(destination_ip);
destination_ips.push_back(cidr_range.asString());
}

std::vector<std::string> source_ips;
source_ips.reserve(filter_chain_match.source_prefix_ranges().size());
for (const auto& source_ip : filter_chain_match.source_prefix_ranges()) {
const auto& cidr_range = Network::Address::CidrRange::create(source_ip);
source_ips.push_back(cidr_range.asString());
}

// Reject partial wildcards, we don't match on them.
for (const auto& server_name : filter_chain_match.server_names()) {
if (server_name.find('*') != std::string::npos &&
!FilterChainManagerImpl::isWildcardServerName(server_name)) {
throw EnvoyException(
fmt::format("error adding listener '{}': partial wildcards are not supported in "
"\"server_names\"",
address_->asString()));
}
}

// Reuse created filter chain if possible.
// FilterChainManager maintains the lifetime of FilterChainFactoryContext
// ListenerImpl maintains the dependencies of FilterChainFactoryContext
auto filter_chain_impl = findExistingFilterChain(*filter_chain);
if (filter_chain_impl == nullptr) {
filter_chain_impl =
filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator);
++new_filter_chain_size;
}

addFilterChainForDestinationPorts(
destination_ports_map_,
PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0), destination_ips,
filter_chain_match.server_names(), filter_chain_match.transport_protocol(),
filter_chain_match.application_protocols(), filter_chain_match.source_type(), source_ips,
filter_chain_match.source_ports(), filter_chain_impl);
fc_contexts_[*filter_chain] = filter_chain_impl;
}
convertIPsToTries();
ENVOY_LOG(debug, "new fc_contexts has {} filter chains, including {} newly built",
fc_contexts_.size(), new_filter_chain_size);
}

Envoy Admin

我们构建 Example 的时候,有一段内容

1
2
3
4
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 127.0.0.1, port_value: 9901 }

是 Enovy 的管理端口。主要提供了一系列的统计数据等,详见 Envoy-Admin

Envoy Dynamic Manager

我们分析了简单的从文件读入的过程,但是对于一个成熟的网关来说,动态载入数据的类型是不可少的,因此 Envoy 也提供了几种动态获得配置的能力,比如 LDS XDS 等。

LDS

The listener discovery service (LDS) is an optional API that Envoy will call to dynamically fetch listeners. Envoy will reconcile the API response and add, modify, or remove known listeners depending on what is required.

相关文档

Gaph

来杯奶茶, 嗝~~~