探索性阅读 Envoy

一般阅读源码有两种模式

  • 验证式阅读:大致上就是先了解抽象,然后按照抽象去验证系统是否按照我们所理解的方式进行工作。
  • 探索性阅读:和上一种略有不同,当我们有类似软件经验的时候,我们经常会参考其他软件的实现方式(一些通用需要解决的问题),去探索 Envoy 是如何解决这类通用的问题的。
    这两者阅读源码的方式并没有什么好坏,更像是自顶向下和自底向上的差异。

本次我们就采用 探索性 阅读的方式来阅读 Envoy,那当然是探索性阅读肯定是需要一个问题之后才能去探索,这次我们想知道一个问题。

在 FilterChain 的处理过程中,状态数据是如何保存的?

我们知道在很多逻辑中是需要保存临时状态的,比如我们需要对系统做 ratelimit 的时候,比如我需要限制某个 API ,每分钟只能访问 N 次,这个 N 显然是需要存储起来的。我们这里以一个如下的 Yaml 为例子

Filter 实例

猜想

对于探索性编程,最为重要的就是提出猜想。那根据我们之前的一些编码经验来看,我们可以首先可以提出一个问题。

filter 是单例的还是多实例的?

因为我们知道我们总是要存储临时状态的,如果 Filter 是一个多实例的对象,那状态是可能存储在 Filter 之中的,那如果 Filter 是单例的,那我们是需要把状态按照一些特殊的逻辑存储到一个地方,然后再获取出来。

从配置上,其实我们已经可以比较大胆的猜测,Filter 显然是一个多实例的,因为我们可以为不同的路径,配置不一样的 RateLimiter

验证

我们找到 local rate limiter 的代码所在地,加上一个日志看一下。

decodeHeadersgithub
1
2
3
4
5
6
7
void* address = this;
ENVOY_LOG(info, "rate limit address: {}", address);

if (requestAllowed(descriptors)) {
config->stats().ok_.inc();
return Http::FilterHeadersStatus::Continue;
}

我们修改点我们的启动文件,增加一个不一样的 RateLimiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
- name: app
domains:
- "*"
routes:
- match:
prefix: "/hello"
direct_response:
status: 200
body:
inline_string: "Hello"
typed_per_filter_config:
envoy.filters.http.local_ratelimit:
"@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
stat_prefix: http_local_rate_limiter
token_bucket:
max_tokens: 1
tokens_per_fill: 1
fill_interval: 5s
filter_enabled:
runtime_key: local_rate_limit_enabled
default_value:
numerator: 100
denominator: HUNDRED
filter_enforced:
runtime_key: local_rate_limit_enforced
default_value:
numerator: 100
denominator: HUNDRED
response_headers_to_add:
- append_action: OVERWRITE_IF_EXISTS_OR_ADD
header:
key: x-local-rate-limit
value: 'true'
- match:
prefix: "/"
direct_response:
status: 200
body:
inline_string: "Default"
typed_per_filter_config:
envoy.filters.http.local_ratelimit:
"@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
stat_prefix: http_local_rate_limiter
token_bucket:
max_tokens: 1
tokens_per_fill: 1
fill_interval: 5s
filter_enabled:
runtime_key: local_rate_limit_enabled
default_value:
numerator: 100
denominator: HUNDRED
filter_enforced:
runtime_key: local_rate_limit_enforced
default_value:
numerator: 100
denominator: HUNDRED
response_headers_to_add:
- append_action: OVERWRITE_IF_EXISTS_OR_ADD
header:
key: x-local-rate-limit
value: 'true'

随后执行

1
2
curl localhost:10000/hello
curl localhost:10000/

可以看到日志中不同的输出

1
2
[2022-11-01 08:13:17.005][38344][info][filter] [source/extensions/filters/http/local_ratelimit/local_ratelimit.cc:126] rate limit address: 0x57ac3f57b900
[2022-11-01 08:13:17.733][38344][info][filter] [source/extensions/filters/http/local_ratelimit/local_ratelimit.cc:126] rate limit address: 0x57ac3f57b820

显然这两者并不是同一个对象,但是在多次的重复执行中,地址是不会变化的。

那我们显然可以得到一个结论。

Envoy Filter 就是按照配置文件的维度做到多副本的。

随着我们得到一个验证之后,我们就会提出两个全新的问题

  • Filter 是谁构建的
  • Filter 是如何工作的

Filter 初始化

对于 Filter 初始化,我们可以换一种方式来验证,通过断点来快速判断。

Filtergithub
1
2
3
class Filter : public Http::PassThroughFilter, Logger::Loggable<Logger::Id::filter> {
public:
Filter(FilterConfigSharedPtr config) : config_(config) {}

在目标处置下断点,运行之后就很容易知道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter::Filter@0x0000555558f776c8 (/workspaces/envoy/source/extensions/filters/http/local_ratelimit/local_ratelimit.h:139)
__gnu_cxx::new_allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>::construct<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f77675 (/usr/include/c++/9/ext/new_allocator.h:146)
std::allocator_traits<std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter> >::construct<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f7744d (/usr/include/c++/9/bits/alloc_traits.h:483)
std::_Sp_counted_ptr_inplace<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f771d8 (/usr/include/c++/9/bits/shared_ptr_base.h:548)
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f76fce (/usr/include/c++/9/bits/shared_ptr_base.h:680)
std::__shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f76f10 (/usr/include/c++/9/bits/shared_ptr_base.h:1344)
std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>::shared_ptr<std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f76ead (/usr/include/c++/9/bits/shared_ptr.h:359)
std::allocate_shared<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::allocator<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter>, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f76e10 (/usr/include/c++/9/bits/shared_ptr.h:701)
std::make_shared<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::Filter, std::shared_ptr<Envoy::Extensions::HttpFilters::LocalRateLimitFilter::FilterConfig> const&>@0x0000555558f76d31 (/usr/include/c++/9/bits/shared_ptr.h:717)
Envoy::Extensions::HttpFilters::LocalRateLimitFilter::LocalRateLimitFilterConfig::createFilterFactoryFromProtoTyped(envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, Envoy::Server::Configuration::FactoryContext&)::$_0::operator()(Envoy::Http::FilterChainFactoryCallbacks&) const@0x0000555558f74909 (/workspaces/envoy/source/extensions/filters/http/local_ratelimit/config.cc:22)
std::_Function_handler<void (Envoy::Http::FilterChainFactoryCallbacks&), Envoy::Extensions::HttpFilters::LocalRateLimitFilter::LocalRateLimitFilterConfig::createFilterFactoryFromProtoTyped(envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, Envoy::Server::Configuration::FactoryContext&)::$_0>::_M_invoke(std::_Any_data const&, Envoy::Http::FilterChainFactoryCallbacks&)@0x0000555558f74772 (/usr/include/c++/9/bits/std_function.h:300)
std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)>::operator()(Envoy::Http::FilterChainFactoryCallbacks&) const@0x0000555558cd2d05 (/usr/include/c++/9/bits/std_function.h:688)
std::__invoke_impl<void, std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)>&, Envoy::Http::FilterChainFactoryCallbacks&>(std::__invoke_other, std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)>&, Envoy::Http::FilterChainFactoryCallbacks&)@0x000055555d644692 (/usr/include/c++/9/bits/invoke.h:60)
std::__invoke<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)>&, Envoy::Http::FilterChainFactoryCallbacks&>(std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)>&, Envoy::Http::FilterChainFactoryCallbacks&)@0x000055555d644632 (/usr/include/c++/9/bits/invoke.h:95)
std::reference_wrapper<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)> >::operator()<Envoy::Http::FilterChainFactoryCallbacks&>(Envoy::Http::FilterChainFactoryCallbacks&) const@0x000055555d6278e2 (/usr/include/c++/9/bits/refwrap.h:340)
Envoy::Extensions::NetworkFilters::HttpConnectionManager::HttpConnectionManagerConfig::createFilterChainForFactories(Envoy::Http::FilterChainFactoryCallbacks&, std::__cxx11::list<std::unique_ptr<Envoy::Config::ExtensionConfigProvider<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)> >, std::default_delete<Envoy::Config::ExtensionConfigProvider<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)> > > >, std::allocator<std::unique_ptr<Envoy::Config::ExtensionConfigProvider<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)> >, std::default_delete<Envoy::Config::ExtensionConfigProvider<std::function<void (Envoy::Http::FilterChainFactoryCallbacks&)> > > > > > const&)@0x000055555d61e611 (/workspaces/envoy/source/extensions/filters/network/http_connection_manager/config.cc:733)
Envoy::Extensions::NetworkFilters::HttpConnectionManager::HttpConnectionManagerConfig::createFilterChain@0x000055555d61ee84 (/workspaces/envoy/source/extensions/filters/network/http_connection_manager/config.cc:750)
Envoy::Http::FilterManager::createFilterChain@0x000055555d6c2f1a (/workspaces/envoy/source/common/http/filter_manager.cc:1458)
Envoy::Http::ConnectionManagerImpl::ActiveStream::decodeHeaders@0x000055555d6771d8 (/workspaces/envoy/source/common/http/conn_manager_impl.cc:1066)
Envoy::Http::Http1::ServerConnectionImpl::onMessageCompleteBase@0x000055555d7445be (/workspaces/envoy/source/common/http/http1/codec_impl.cc:1184)
Envoy::Http::Http1::ConnectionImpl::onMessageComplete@0x000055555d73e21c (/workspaces/envoy/source/common/http/http1/codec_impl.cc:861)
Envoy::Http::Http1::LegacyHttpParserImpl::Impl::Impl(http_parser_type, void*)::{lambda(http_parser*)#3}::operator()(http_parser*) const@0x000055555d75de71 (/workspaces/envoy/source/common/http/http1/legacy_parser_impl.cc:80)
Envoy::Http::Http1::LegacyHttpParserImpl::Impl::Impl(http_parser_type, void*)::{lambda(http_parser*)#3}::__invoke(http_parser*)@0x000055555d75de35 (/workspaces/envoy/source/common/http/http1/legacy_parser_impl.cc:78)
http_parser_execute@0x000055555e26ae93 (/workspaces/envoy/bazel/external/http_parser/http_parser.c:1933)
Envoy::Http::Http1::LegacyHttpParserImpl::Impl::execute@0x000055555d75d1ef (/workspaces/envoy/source/common/http/http1/legacy_parser_impl.cc:97)
Envoy::Http::Http1::LegacyHttpParserImpl::execute@0x000055555d75c65f (/workspaces/envoy/source/common/http/http1/legacy_parser_impl.cc:158)
Envoy::Http::Http1::ConnectionImpl::dispatchSlice@0x000055555d73731c (/workspaces/envoy/source/common/http/http1/codec_impl.cc:639)
Envoy::Http::Http1::ConnectionImpl::dispatch@0x000055555d73524b (/workspaces/envoy/source/common/http/http1/codec_impl.cc:602)
Envoy::Http::Http1::ServerConnectionImpl::dispatch@0x000055555d743919 (/workspaces/envoy/source/common/http/http1/codec_impl.cc:1158)
Envoy::Http::ConnectionManagerImpl::onData@0x000055555d66c7bd (/workspaces/envoy/source/common/http/conn_manager_impl.cc:380)
Envoy::Network::FilterManagerImpl::onContinueReading@0x000055555e00708a (/workspaces/envoy/source/common/network/filter_manager_impl.cc:78)
Envoy::Network::FilterManagerImpl::onRead@0x000055555e00760c (/workspaces/envoy/source/common/network/filter_manager_impl.cc:88)
Envoy::Network::ConnectionImpl::onRead@0x000055555dfea503 (/workspaces/envoy/source/common/network/connection_impl.cc:340)
Envoy::Network::ConnectionImpl::onReadReady@0x000055555dff7b34 (/workspaces/envoy/source/common/network/connection_impl.cc:646)
Envoy::Network::ConnectionImpl::onFileEvent@0x000055555dff2f5e (/workspaces/envoy/source/common/network/connection_impl.cc:597)
Envoy::Network::ConnectionImpl::ConnectionImpl(Envoy::Event::Dispatcher&, std::unique_ptr<Envoy::Network::ConnectionSocket, std::default_delete<Envoy::Network::ConnectionSocket> >&&, std::unique_ptr<Envoy::Network::TransportSocket, std::default_delete<Envoy::Network::TransportSocket> >&&, Envoy::StreamInfo::StreamInfo&, bool)::$_6::operator()(unsigned int) const@0x000055555dffe3fe (/workspaces/envoy/source/common/network/connection_impl.cc:94)
std::_Function_handler<void (unsigned int), Envoy::Network::ConnectionImpl::ConnectionImpl(Envoy::Event::Dispatcher&, std::unique_ptr<Envoy::Network::ConnectionSocket, std::default_delete<Envoy::Network::ConnectionSocket> >&&, std::unique_ptr<Envoy::Network::TransportSocket, std::default_delete<Envoy::Network::TransportSocket> >&&, Envoy::StreamInfo::StreamInfo&, bool)::$_6>::_M_invoke(std::_Any_data const&, unsigned int&&)@0x000055555dffe2c1 (/usr/include/c++/9/bits/std_function.h:300)
std::function<void (unsigned int)>::operator()(unsigned int) const@0x000055555c823494 (/usr/include/c++/9/bits/std_function.h:688)
Envoy::Event::DispatcherImpl::createFileEvent(int, std::function<void (unsigned int)>, Envoy::Event::FileTriggerType, unsigned int)::$_5::operator()(unsigned int) const@0x000055555dfbce9f (/workspaces/envoy/source/common/event/dispatcher_impl.cc:183)
std::_Function_handler<void (unsigned int), Envoy::Event::DispatcherImpl::createFileEvent(int, std::function<void (unsigned int)>, Envoy::Event::FileTriggerType, unsigned int)::$_5>::_M_invoke(std::_Any_data const&, unsigned int&&)@0x000055555dfbccf1 (/usr/include/c++/9/bits/std_function.h:300)

通过 callstack 我们就可以非常快速的知道,通过如下代码进行构建,而 HttpConnectionManager 显然就是上层的配置,也就是按照 Factory 的模式逐层构建出来。

createFilterChaingithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  if (upgrade != nullptr) {
const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();

if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
upgrade_map, *this)) {
filter_manager_callbacks_.upgradeFilterChainCreated();
return true;
} else {
upgrade_rejected = true;
// Fall through to the default filter chain. The function calling this
// will send a local reply indicating that the upgrade failed.
}
}
// create here !
filter_chain_factory_.createFilterChain(*this);
return !upgrade_rejected;
}

Filter 是如何工作的

当我们知道这个生命周期的变化之后,我们就想知道,这个 Filter 是如何工作的。从配置上,我们可以发现采用的是 Bucket的方式进行限流,显然这里涉及到几个问题。

1
2
3
4
token_bucket:
max_tokens: 1
tokens_per_fill: 1
fill_interval: 5s
  1. bucket 是怎么填充的?
  2. 判断逻辑又是怎么工作的?

验证 判断逻辑是怎么工作的

对于 判断逻辑又是怎么工作的 这件事情,对于大多数的 RateLimit 工作都是通过直接判断当前的 bucket 是否还有剩余量来判断是不是限流,大多数应该都是这样的逻辑。
由于我们又知道 Filter 的都是基于继承的逻辑来实现,那我们就可以非常轻松的定位到

decodeHeadersgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool) {
const auto* config = getConfig();

if (!config->enabled()) {
return Http::FilterHeadersStatus::Continue;
}

config->stats().enabled_.inc();

std::vector<RateLimit::LocalDescriptor> descriptors;
if (config->hasDescriptors()) {
populateDescriptors(descriptors, headers);
}

// Store descriptors which is used to generate x-ratelimit-* headers in encoding response headers.
stored_descriptors_ = descriptors;

if (ENVOY_LOG_CHECK_LEVEL(debug)) {
for (const auto& request_descriptor : descriptors) {
for (const Envoy::RateLimit::DescriptorEntry& entry : request_descriptor.entries_) {
ENVOY_LOG(debug, "populate descriptors: key={} value={}", entry.key_, entry.value_);
}
}
}

if (requestAllowed(descriptors)) {
config->stats().ok_.inc();
return Http::FilterHeadersStatus::Continue;
}

config->stats().rate_limited_.inc();

if (!config->enforced()) {
config->requestHeadersParser().evaluateHeaders(headers, decoder_callbacks_->streamInfo());
return Http::FilterHeadersStatus::Continue;
}

config->stats().enforced_.inc();

decoder_callbacks_->sendLocalReply(
config->status(), "local_rate_limited",
[this, config](Http::HeaderMap& headers) {
config->responseHeadersParser().evaluateHeaders(headers, decoder_callbacks_->streamInfo());
},
absl::nullopt, "local_rate_limited");
decoder_callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::RateLimited);

return Http::FilterHeadersStatus::StopIteration;
}

显然 requestAllowed 中就是我们判断请求是否运行的地方。 descriptors 我们 Skip 掉,这是区分同一个路径下不同的 Header 的处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
descriptors:
- entries:
- key: client_cluster
value: foo
- key: path
value: /foo/bar
token_bucket:
max_tokens: 10
tokens_per_fill: 10
fill_interval: 60s
- entries:
- key: client_cluster
value: foo
- key: path
value: /foo/bar2
token_bucket:
max_tokens: 100
tokens_per_fill: 100
fill_interval: 60s

通过一通小代码,我们就很快找到了,具体的执行逻辑

requestAllowedHelpergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool LocalRateLimiterImpl::requestAllowedHelper(const TokenState& tokens) const {
// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint32_t expected_tokens = tokens.tokens_.load(std::memory_order_relaxed);
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
if (expected_tokens == 0) {
return false;
}

// Testing hook.
synchronizer_.syncPoint("allowed_pre_cas");

// Loop while the weak CAS fails trying to subtract 1 from expected.
} while (!tokens.tokens_.compare_exchange_weak(expected_tokens, expected_tokens - 1,
std::memory_order_relaxed));

// We successfully decremented the counter by 1.
return true;
}

判断如果 expected_tokens == 0 直接失败,其他通过 CAS 将期望的 Token - 1,返回成功。

验证 Token 是如何填充的

对于 Token 的填充,势必是需要一个 Timer 来进行的,我们在代码中搜索一下即可发现

LocalRateLimiterImplgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
LocalRateLimiterImpl::LocalRateLimiterImpl(
const std::chrono::milliseconds fill_interval, const uint32_t max_tokens,
const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher,
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors)
: fill_timer_(fill_interval > std::chrono::milliseconds(0)
? dispatcher.createTimer([this] { onFillTimer(); })
: nullptr),
time_source_(dispatcher.timeSource()) {
if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
}

token_bucket_.fill_interval_ = absl::FromChrono(fill_interval);
tokens_.tokens_ = max_tokens;
tokens_.fill_time_ = time_source_.monotonicTime();

if (fill_timer_) {
fill_timer_->enableTimer(fill_interval);
}

显然通过 dispatcher 创建一个 timer,然后在后面通过 enableTimer 启动了这个定时器。

下一个疑问就接踵而来,dispatcher 这是啥东西,这里我们知道它可以创建一个 timer

dispatcher 探索

对于一个全新的概念,我们最初的时候会使用联想大法,之前有没有什么类似的东西。恰好如果以前是一个 Java Boy 我们自然会想到 dispatcherServlet 这个抽象,是作为分发 Http 请求的,而这里的创建 timer 好像完全对不上号。那我们先来看看这个对象的定义。

Dispatchergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class Dispatcher : public DispatcherBase, public ScopeTracker {
public:
/**
* Returns the name that identifies this dispatcher, such as "worker_2" or "main_thread".
* @return const std::string& the name that identifies this dispatcher.
*/
virtual const std::string& name() PURE;

/**
* Creates a file event that will signal when a file is readable or writable. On UNIX systems this
* can be used for any file like interface (files, sockets, etc.).
* @param fd supplies the fd to watch.
* @param cb supplies the callback to fire when the file is ready.
* @param trigger specifies whether to edge or level trigger.
* @param events supplies a logical OR of FileReadyType events that the file event should
* initially listen on.
*/
virtual FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) PURE;

/**
* Allocates a timer. @see Timer for docs on how to use the timer.
* @param cb supplies the callback to invoke when the timer fires.
*/
virtual Event::TimerPtr createTimer(TimerCb cb) PURE;

/**
* Allocates a scaled timer. @see Timer for docs on how to use the timer.
* @param timer_type the type of timer to create.
* @param cb supplies the callback to invoke when the timer fires.
*/
virtual Event::TimerPtr createScaledTimer(Event::ScaledTimerType timer_type, TimerCb cb) PURE;

/**
* Allocates a scaled timer. @see Timer for docs on how to use the timer.
* @param minimum the rule for computing the minimum value of the timer.
* @param cb supplies the callback to invoke when the timer fires.
*/
virtual Event::TimerPtr createScaledTimer(Event::ScaledTimerMinimum minimum, TimerCb cb) PURE;

/**
* Allocates a schedulable callback. @see SchedulableCallback for docs on how to use the wrapped
* callback.
* @param cb supplies the callback to invoke when the SchedulableCallback is triggered on the
* event loop.
*/
virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) PURE;

/**
* Register a watchdog for this dispatcher. The dispatcher is responsible for touching the
* watchdog at least once per touch interval. Dispatcher implementations may choose to touch more
* often to avoid spurious miss events when processing long callback queues.
* @param min_touch_interval Touch interval for the watchdog.
*/
virtual void registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
std::chrono::milliseconds min_touch_interval) PURE;

/**
* Returns a time-source to use with this dispatcher.
*/
virtual TimeSource& timeSource() PURE;

/**
* Returns a recently cached MonotonicTime value.
*/
virtual MonotonicTime approximateMonotonicTime() const PURE;

/**
* Initializes stats for this dispatcher. Note that this can't generally be done at construction
* time, since the main and worker thread dispatchers are constructed before
* ThreadLocalStoreImpl::initializeThreading.
* @param scope the scope to contain the new per-dispatcher stats created here.
* @param prefix the stats prefix to identify this dispatcher. If empty, the dispatcher will be
* identified by its name.
*/
virtual void initializeStats(Stats::Scope& scope,
const absl::optional<std::string>& prefix = absl::nullopt) PURE;

/**
* Clears any items in the deferred deletion queue.
*/
virtual void clearDeferredDeleteList() PURE;

/**
* Wraps an already-accepted socket in an instance of Envoy's server Network::Connection.
* @param socket supplies an open file descriptor and connection metadata to use for the
* connection. Takes ownership of the socket.
* @param transport_socket supplies a transport socket to be used by the connection.
* @param stream_info info object for the server connection
* @return Network::ConnectionPtr a server connection that is owned by the caller.
*/
virtual Network::ServerConnectionPtr
createServerConnection(Network::ConnectionSocketPtr&& socket,
Network::TransportSocketPtr&& transport_socket,
StreamInfo::StreamInfo& stream_info) PURE;

/**
* Creates an instance of Envoy's Network::ClientConnection. Does NOT initiate the connection;
* the caller must then call connect() on the returned Network::ClientConnection.
* @param address supplies the address to connect to.
* @param source_address supplies an address to bind to or nullptr if no bind is necessary.
* @param transport_socket supplies a transport socket to be used by the connection.
* @param options the socket options to be set on the underlying socket before anything is sent
* on the socket.
* @return Network::ClientConnectionPtr a client connection that is owned by the caller.
*/
virtual Network::ClientConnectionPtr
createClientConnection(Network::Address::InstanceConstSharedPtr address,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;

/**
* @return Filesystem::WatcherPtr a filesystem watcher owned by the caller.
*/
virtual Filesystem::WatcherPtr createFilesystemWatcher() PURE;

/**
* Creates a listener on a specific port.
* @param socket supplies the socket to listen on.
* @param cb supplies the callbacks to invoke for listener events.
* @param runtime supplies the runtime for this server.
* @param bind_to_port controls whether the listener binds to a transport port or not.
* @param ignore_global_conn_limit controls whether the listener is limited by the global
* connection limit.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket,
Network::TcpListenerCallbacks& cb,
Runtime::Loader& runtime, bool bind_to_port,
bool ignore_global_conn_limit) PURE;

/**
* Creates a logical udp listener on a specific port.
* @param socket supplies the socket to listen on.
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @param config provides the UDP socket configuration.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::UdpListenerPtr
createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb,
const envoy::config::core::v3::UdpSocketConfig& config) PURE;
/**
* Submits an item for deferred delete. @see DeferredDeletable.
*/
virtual void deferredDelete(DeferredDeletablePtr&& to_delete) PURE;

/**
* Exits the event loop.
*/
virtual void exit() PURE;

/**
* Listens for a signal event. Only a single dispatcher in the process can listen for signals.
* If more than one dispatcher calls this routine in the process the behavior is undefined.
*
* @param signal_num supplies the signal to listen on.
* @param cb supplies the callback to invoke when the signal fires.
* @return SignalEventPtr a signal event that is owned by the caller.
*/
virtual SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) PURE;

/**
* Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on
* the dispatcher's thread before dispatcher destroy. This is safe cross thread.
*/
virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE;

/**
* Runs the event loop. This will not return until exit() is called either from within a callback
* or from a different thread.
* @param type specifies whether to run in blocking mode (run() will not return until exit() is
* called) or non-blocking mode where only active events will be executed and then
* run() will return.
*/
enum class RunType {
Block, // Runs the event-loop until there are no pending events.
NonBlock, // Checks for any pending events to activate, executes them,
// then exits. Exits immediately if there are no pending or
// active events.
RunUntilExit // Runs the event-loop until loopExit() is called, blocking
// until there are pending or active events.
};
virtual void run(RunType type) PURE;

/**
* Returns a factory which connections may use for watermark buffer creation.
* @return the watermark buffer factory for this dispatcher.
*/
virtual Buffer::WatermarkFactory& getWatermarkFactory() PURE;

/**
* Updates approximate monotonic time to current value.
*/
virtual void updateApproximateMonotonicTime() PURE;

/**
* Shutdown the dispatcher by clear dispatcher thread deletable.
*/
virtual void shutdown() PURE;
};

通过 API 定义,我们其实就可以有一个感性的认识,dispatch 像一个在 Thread 级别上的东西,为每一个 工作线程 提供了最开始的入口函数,无论是 Listener 还是 Timer最终要作用在某个线程上,集合传统的 Reactor 编程模型,我们可以自然的推测出,这里的 Dispatch 就是和我们在启动的时候

1
--concurrency 并发数

有关系。

验证猜想

随即我们在代码的各处就可以发现对象创建的身影,比如创建主线程对象

InstanceImplgitub
1
dispatcher_(api_->allocateDispatcher("main_thread")),

又比如创建 worker 线程的对象

createWorkergithub
1
2
3
4
5
6
7
8
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
const std::string& worker_name) {
Event::DispatcherPtr dispatcher(
api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
auto conn_handler = std::make_unique<ConnectionHandlerImpl>(*dispatcher, index);
return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
overload_manager, api_, stat_names_);
}

最终找到了我们猜想的地方。啊哈,Eureka!

ListenerManagergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ListenerManagerImpl::ListenerManagerImpl(Instance& server,
ListenerComponentFactory& listener_factory,
WorkerFactory& worker_factory,
bool enable_dispatcher_stats,
Quic::QuicStatNames& quic_stat_names)
: server_(server), factory_(listener_factory),
scope_(server.stats().createScope("listener_manager.")), stats_(generateStats(*scope_)),
config_tracker_entry_(server.admin().getConfigTracker().add(
"listeners",
[this](const Matchers::StringMatcher& name_matcher) {
return dumpListenerConfigs(name_matcher);
})),
enable_dispatcher_stats_(enable_dispatcher_stats), quic_stat_names_(quic_stat_names) {
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(
worker_factory.createWorker(i, server.overloadManager(), absl::StrCat("worker_", i)));
}
}

Timer

让我们回归 timer 的逻辑本身,定时器在所有的系统中都是非常重要的实现,《Linux高性能服务器编程》中有单独的一个章节:定时器 章节。

不过在 Envoy中,这块比较的简单直接委托给 libevent 的来实现了。

createTimergithub
1
2
3
TimerPtr LibeventScheduler::createTimer(const TimerCb& cb, Dispatcher& dispatcher) {
return std::make_unique<TimerImpl>(libevent_, cb, dispatcher);
};

随着我们对限流的代码的阅读,我们发现对 Envoy 来说,还是非常符合我们的编程习惯的。让我们看了 local_rate_limiter 我们就来看看与之非常类似的 BandwidthLimit,从文档中可以得知,Local_rate_limiter 对于超出的请求直接失败了,但是我们有另外一个场景就是我们希望对用户有限流的场景,比如限制非会员 100kb/s 那这样的场景下就是 BandwidthLimit 所适用的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: {prefix: "/path/with/bandwidth/limit"}
route: {cluster: service_protected_by_bandwidth_limit}
typed_per_filter_config:
envoy.filters.http.bandwidth_limit:
"@type": type.googleapis.com/envoy.extensions.filters.http.bandwidth_limit.v3.BandwidthLimit
stat_prefix: bandwidth_limiter_custom_route
enable_mode: REQUEST_AND_RESPONSE
limit_kbps: 500
fill_interval: 0.1s
- match: {prefix: "/"}
route: {cluster: web_service}

根据我们之前读 local_rate_limiter 的代码,我们就很容易推测出 BandwidthLimit 的逻辑。

  • 在创建 Filter 的时候配置了 BandwidthLimit 对象,有局部变量储存
  • 在 Dispatch 中增加定时更新 Token 的逻辑

但是我们会有一个疑问,这个请求是如何做到限流的,如果请求量比我们限制多的时候,这个请求是如何处理的,这里显然需要解决一个问题 Thread 不能被阻塞

有了上次的经验,我们直接来阅读 核心部分

writeDatagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream,
bool trailer_added) {
auto len = incoming_buffer.length();
buffer_.move(incoming_buffer);
saw_end_stream_ = end_stream;

if (trailer_added) {
saw_trailers_ = true;
}

ENVOY_LOG(debug,
"StreamRateLimiter <writeData>: got new {} bytes of data. token "
"timer {} scheduled.",
len, !token_timer_->enabled() ? "now" : "already");
if (!token_timer_->enabled()) {
token_timer_->enableTimer(std::chrono::milliseconds(0), &scope_);
}
}

写入的数据直接会被 move 到一个 buffer 中,而不是直接处理,放入 buffer之中之后,然后触发 enableTimer 也就是让 timer 继续工作,那显然逻辑会放在 onTokenTimer 的逻辑之中

onTokenTimergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void StreamRateLimiter::onTokenTimer() {
Buffer::OwnedImpl data_to_write;

// 首先我们确定,我们能够使用的 token 数量,和 buffer 必读,来确定我们能处理的数据长度
const uint64_t tokens_obtained = token_bucket_->consume(buffer_.length(), true);
const uint64_t bytes_to_write = std::min(tokens_obtained, buffer_.length());
ENVOY_LOG(debug,
"StreamRateLimiter <onTokenTimer>: tokens_needed={} "
"tokens_obtained={} to_write={}",
buffer_.length(), tokens_obtained, bytes_to_write);

// 将limit能放行的数据量移动到 buffer 里面
data_to_write.move(buffer_, bytes_to_write);
write_stats_cb_(bytes_to_write, buffer_.length() > 0);

// 如果数据包已经取完,就不用安排下一次的执行时间了,但是如果还有就要在下次 fill token 的是,处理逻辑
if (buffer_.length() > 0) {
ENVOY_LOG(debug,
"StreamRateLimiter <onTokenTimer>: scheduling wakeup for {}ms, "
"buffered={}",
fill_interval_.count(), buffer_.length());
token_timer_->enableTimer(fill_interval_, &scope_);
}

// 调用回调函数来继续处理
write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_);

// If there is no more data to send and we saw trailers, we need to continue iteration to release
// the trailers to further filters.
if (buffer_.length() == 0 && saw_trailers_) {
continue_cb_();
}
}