How Dubbo Envoy Filter Works (For Envoy)

今天就让我们一起来品一品,Envoy 中的 Dubbo 协议是如何实现的,基于 1.18 分支

Intro

Dubbo Filter 位于 /*/extensions/filters/network/dubbo_proxy 中,首先让我们先瞧一瞧,这个 Filter 是如何使用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 20881
filter_chains:
- filters:
- name: envoy.filters.network.dubbo_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.dubbo_proxy.v3.DubboProxy
stat_prefix: dubbo_incomming_stats
protocol_type: Dubbo
serialization_type: Hessian2
route_config:
name: local_route
interface: org.apache.dubbo.demo.DemoService
routes:
- match:
method:
name:
exact: sayHello
route:
cluster: user_service_dubbo_server
dubbo_filters:
- name: envoy.filters.dubbo.testFilter
typed_config:
"@type": type.googleapis.com/google.protobuf.Struct
value:
name: test_service
- name: envoy.filters.dubbo.router

可见 dubbo 的插件是以 filter 的形式,通过 LDS 协议进行分发。

插件加载

Envoy Extend 中也聊过这部分内容,简而言之就是在 ListenerImpl 中,初始化 FilterChain 的时候是可以基于注册机制完成的

ListenerImpl::createNetworkFilterChaingithub
1
2
3
4
5
bool ListenerImpl::createNetworkFilterChain(
Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>& filter_factories) {
return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}

从机制上,我们需要注册 factoryenvoy 的上下文中,因此在 dubbo_proxy/config.cc 进行了注册

REGISTER_FACTORYgithub
1
REGISTER_FACTORY(DubboProxyFilterConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory);

插件初始化

注册到 Envoy 之后,在系统启动的时候,系统就会回调我们构建我们的 Factory,继而又由 Factory 构建出我们的 Filter 而这部分的逻辑处于

createFilterFactoryFromProtoTypedgithub
1
2
3
4
5
6
7
8
9
10
Network::FilterFactoryCb DubboProxyFilterConfigFactory::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::network::dubbo_proxy::v3::DubboProxy& proto_config,
Server::Configuration::FactoryContext& context) {
std::shared_ptr<Config> filter_config(std::make_shared<ConfigImpl>(proto_config, context));

return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.api().randomGenerator(), context.dispatcher().timeSource())); // 当构建 filter_manager 回调完成 Filter 的构建
};
}

当我们构建完成系统之后,就来到了如何处理数据量的地方了。我们从上文中已经知道,我们在 filter_manager 中构建了 ReadFilter 为我们的 dubbo 协议, 而这个 ReadFilter 实际上就是 ConnectionManager

ConnectionManager 的组件名真的有点大,这里的 `ConnectionManager` 和系统内置的并不是相同的(看名字很容易搞错)。

conn_manager.hgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ConnectionManager : public Network::ReadFilter,
public Network::ConnectionCallbacks,
public RequestDecoderCallbacks,
Logger::Loggable<Logger::Id::dubbo> {
public:
using ConfigProtocolType = envoy::extensions::filters::network::dubbo_proxy::v3::ProtocolType;
using ConfigSerializationType =
envoy::extensions::filters::network::dubbo_proxy::v3::SerializationType;

ConnectionManager(Config& config, Random::RandomGenerator& random_generator,
TimeSource& time_system);
~ConnectionManager() override = default;

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks&) override;

// RequestDecoderCallbacks
StreamHandler& newStream() override;

// This function is for testing only.
std::list<ActiveMessagePtr>& getActiveMessagesForTest() { return active_message_list_; }
};

} // namespace DubboProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy

从函数签名上,我们即可知道,这个 ConnectionManagerConnectionCallbacks & ReadFilter & RequestDecoderCallbacks 三合一了。

此时系统很清晰的可以了解到是这么工作的

全局抽象

对于 Dubbo Proxy 来说,代码中抽象一些比较重要的概念

  • ConnectionManager: 请求逻辑的主入口
  • Decoder: 数据体反序列的实现,分 ResponseDecoderRequestDecoder
  • Protocol: Dubbo 协议的实现部分(仅解析 Dubbo 自定义的 Header 部分)
  • ActiveMessage: 一次 Dubbo 请求所对应的上下文,将其他的抽象串联起来
  • Router: 路由转发匹配
  • FilterChain: Dubbo 内部依然提供了额外构建的的 Filter Chain

数据处理 [TOP HALF]

读数据处理

读数据很明显从 onData 作为入口。

onDatagithub
1
2
3
4
5
6
7
8
9
10
11
Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(trace, "dubbo: read {} bytes", data.length());
request_buffer_.move(data);
dispatch();

if (end_stream) {
// 暂时先忽略 end bottom
}

return Network::FilterStatus::StopIteration;
}

对于数据获取到第一时间就直接进行了 dispatch()

dispatchgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ConnectionManager::dispatch() {
try {
bool underflow = false;
while (!underflow) { // 等待数据足够完成序列化
decoder_->onData(request_buffer_, underflow);
}
return;
} catch (const EnvoyException& ex) {
ENVOY_CONN_LOG(error, "dubbo error: {}", read_callbacks_->connection(), ex.what());
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
stats_.request_decoding_error_.inc();
}
resetAllMessages(true);
}

首先我们先经过内部的 decoder_ 进行序列化。

onDatagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
FilterStatus DecoderBase::onData(Buffer::Instance& data, bool& buffer_underflow) {
ENVOY_LOG(debug, "dubbo decoder: {} bytes available", data.length());
buffer_underflow = false;

if (!decode_started_) {
start(); // 状态机尚未初始化,现在开始
}

ASSERT(state_machine_ != nullptr);

ENVOY_LOG(debug, "dubbo decoder: protocol {}, state {}, {} bytes available", protocol_.name(),
ProtocolStateNameValues::name(state_machine_->currentState()), data.length());

ProtocolState rv = state_machine_->run(data); // 开始处理数据,这里涉及到不同的序列化协议,继续深入
switch (rv) {
case ProtocolState::WaitForData: // 如果需要更多数据,直接 Return 等待
ENVOY_LOG(debug, "dubbo decoder: wait for data");
buffer_underflow = true;
return FilterStatus::Continue;
default:
break; // 其他就是结束,也就是收集完了,下面就要重置状态机
}

ASSERT(rv == ProtocolState::Done);

complete();
buffer_underflow = (data.length() == 0);
ENVOY_LOG(debug, "dubbo decoder: data length {}", data.length());
return FilterStatus::Continue;
}

这里 dubbo 抽象了一个 状态机 来解决问题,大致上也就是需要等待所有的数据收集到之后我们继续向下处理,因此当我们将数据收集完整之后,我们继续处理。

序列化处理

继续深入系列化的部分

decodegithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ProtocolState DecoderStateMachine::run(Buffer::Instance& buffer) {
while (state_ != ProtocolState::Done) {
DecoderStatus s = handleState(buffer);
if (s.next_state_ == ProtocolState::WaitForData) {
return ProtocolState::WaitForData;
}
state_ = s.next_state_;
}
return state_;
}

DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState(Buffer::Instance& buffer) {
switch (state_) {
case ProtocolState::OnDecodeStreamHeader:
return onDecodeStreamHeader(buffer);
case ProtocolState::OnDecodeStreamData:
return onDecodeStreamData(buffer);
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
}

对于接收到的数据,我们进行处理,对于 Stream 类型的数据,分为了 HeaderData 部分的处理,大致上是一样的,我们去 Header 的逻辑中一窥究竟。对于大部分的协议而言,Header 都是我们的元数据部分,包含了 Len Type 之类的字段,一般都是先处理 Header

onDecodeStreamHeader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DecoderStateMachine::DecoderStatus
DecoderStateMachine::onDecodeStreamHeader(Buffer::Instance& buffer) {
auto metadata = std::make_shared<MessageMetadata>();
auto ret = protocol_.decodeHeader(buffer, metadata); // 使用 protocal 进行解析
if (!ret.second) { // 当数据不足的时候,返回等待,因为 Header 本身还没传完。这里用了 Pair 返回
ENVOY_LOG(debug, "dubbo decoder: need more data for {} protocol", protocol_.name());
return {ProtocolState::WaitForData};
}

auto context = ret.first;
if (metadata->messageType() == MessageType::HeartbeatRequest || metadata->messageType() == MessageType::HeartbeatResponse) {
// 心跳处理,Skip
}

// 将此 Stream 委托给另外一个对象处理,而这里就是
active_stream_ = delegate_.newStream(metadata, context);
context->originMessage().move(buffer, context->headerSize());

// 当我们处理完成 Header 部分,给上游回复一个 OnDecodeStreamData,也就是下一次来的数据,就是我们的 Body 部分
return {ProtocolState::OnDecodeStreamData};
}

而委托者是谁,就是我们解答此代码逻辑的核心,我们在 decoder.h 中会发现 delegate_ 的原型是一个 Decoder

1
2
3
4
ActiveStream* newStream(MessageMetadataSharedPtr metadata, ContextSharedPtr context) override {
stream_ = std::make_unique<ActiveStream>(callbacks_.newStream(), metadata, context);
return stream_.get();
}

可见最为重要的是 callbacks_ 如何 newStream() 的,这部分的逻辑在

ConnectionManager.newSteam()link
1
2
3
4
5
6
7
8
StreamHandler& ConnectionManager::newStream() {
ActiveMessagePtr new_message(std::make_unique<ActiveMessage>(*this));
// 构建一个 FilterChain
new_message->createFilterChain();
// 将任务 Append 既有的任务列表中, 然后返回
LinkedList::moveIntoList(std::move(new_message), active_message_list_);
return **active_message_list_.begin();
}

走到这里,我们又看见了我们熟悉的 FilterChain,我们可以把整个系统看作一个大 FilterChain,而在 Dubbo Proxy 包含了一个子 FilterChain

而当我们继续处理 Body 部分的时候

onDecodeStreamDatagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DecoderStateMachine::DecoderStatus
DecoderStateMachine::onDecodeStreamData(Buffer::Instance& buffer) {
if (!protocol_.decodeData(buffer, active_stream_->context_, active_stream_->metadata_)) {
return {ProtocolState::WaitForData};
}

active_stream_->context_->originMessage().move(buffer, active_stream_->context_->bodySize());
active_stream_->onStreamDecoded(); //经过转换直接跳转到 ActiveMessage::onStreamDecoded 中
active_stream_ = nullptr;

return {ProtocolState::Done};
}

void ActiveMessage::onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) {
metadata_ = metadata;
context_ = ctx;

// 定义进入 filter 进行 Message Decoded 的工作,把这个对象放到了 ActiveMessage 上下文中
filter_action_ = [metadata, ctx](DubboFilters::DecoderFilter* filter) -> FilterStatus {
return filter->onMessageDecoded(metadata, ctx);
};

// 然后去执行这个 filter
auto status = applyDecoderFilters(nullptr, FilterIterationStartState::CanStartFromCurrent);
if (status == FilterStatus::StopIteration) {
pending_stream_decoded_ = true;
return;
}

finalizeRequest();
}
applyDecoderFilters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FilterStatus ActiveMessage::applyDecoderFilters(ActiveMessageDecoderFilter* filter,
FilterIterationStartState state) {
if (!local_response_sent_) {
for (auto entry = commonDecodePrefix(filter, state); entry != decoder_filters_.end(); entry++) {
const FilterStatus status = filter_action_((*entry)->handler().get());
if (local_response_sent_) {
break;
}

if (status != FilterStatus::Continue) {
return status;
}
}
}

filter_action_ = nullptr;
return FilterStatus::Continue;
}

显然进入的逻辑就是一个比较正常的 filter chain 了,走过这些漫长曲折的道路,我们来到了,真正的逻辑部分。也就是这些 filter 是什么,不过这里显然是配置化的,在 example 也如此,因此在此处我们不做深入,逻辑和之前的 filterChain 类似。

Dubbo Filter Route

我们在上面 return filter->onMessageDecoded(metadata, ctx); 中,当我们完成了所有的 MessageDcoded 就会进入如下逻辑,显然进入了我们所能见的 route 阶段。

onMessageDecodedgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FilterStatus Router::onMessageDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) {
ASSERT(metadata->hasInvocationInfo());
const auto& invocation = metadata->invocationInfo();

route_ = callbacks_->route();
if (!route_) {
// 没有 route 直接 Stop
return FilterStatus::StopIteration;
}

route_entry_ = route_->routeEntry();

Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(route_entry_->clusterName());
if (!cluster) {
// 没找到 cluster 就返回
}

cluster_ = cluster->info();

Tcp::ConnectionPool::Instance* conn_pool =
cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);

const auto* invocation_impl = dynamic_cast<const RpcInvocationImpl*>(&invocation);
if (invocation_impl->hasAttachment() && invocation_impl->attachment().attachmentUpdated()) {
// 附件上传
} else {
//直接将请求体放置于 upstream_request_buffer_ 中,并不做任何修改
upstream_request_buffer_.move(ctx->originMessage(), ctx->messageSize());
}

upstream_request_ = std::make_unique<UpstreamRequest>(
*this, *conn_pool, metadata, callbacks_->serializationType(), callbacks_->protocolType());

// 开始传输数据
return upstream_request_->start();
}

转发的本体逻辑并不复杂,主要的还是在 route 部分,我们来看看。

Route

matchesgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RouteConstSharedPtr MethodRouteEntryImpl::matches(const MessageMetadata& metadata, uint64_t random_value) const {
const auto invocation = dynamic_cast<const RpcInvocationImpl*>(&metadata.invocationInfo());

// 根据 invocation 的 Header 进行匹配
if (!RouteEntryImplBase::headersMatch(*invocation)) {
return nullptr;
}

// 根据 Method 进行匹配
if (!method_name_.match(invocation->methodName())) {
return nullptr;
}

// 根据参数匹配
if (parameter_route_) {
return parameter_route_->matches(metadata, random_value);
}

// 随机选择
return clusterEntry(random_value);
}

路由部分就是标准的一个从上往下执行的过程。

总结

当我们完成了上半部分的逻辑之后,我们会得到这么一个全局的视角

数据处理 [BOTTOM HALF]

写入回调

显然当我们 start 写入之后,当服务端回复了我们的请求,我们需要知道在哪里处理,而这一切在构建的时候就已经确定了。

start
1
2
3
4
5
6
7
8
9
10
FilterStatus Router::UpstreamRequest::start() {
Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(*this); // 这里的this也就是 callback
// ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks)
if (handle) {
conn_pool_handle_ = handle;
return FilterStatus::StopIteration;
}

return FilterStatus::Continue;
}

因此,完成请求之后的回调是进入

onPoolReadygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) {
onUpstreamHostSelected(host);
conn_data_ = std::move(conn_data);
conn_data_->addUpstreamCallbacks(parent_);
onRequestStart(continue_decoding);
encodeData(parent_.upstream_request_buffer_);
}

// 最终和 Envoy 的契约跳入此函数
void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
// 处理返回
if (!upstream_request_->response_started_) {
callbacks_->startUpstreamResponse();
upstream_request_->response_started_ = true;
}

// 调用回调的 upstreamData,而这里的 callbacks 其实就是 ActiveMessageDecoderFilter
DubboFilters::UpstreamResponseStatus status = callbacks_->upstreamData(data);
if (status == DubboFilters::UpstreamResponseStatus::Complete) {
ENVOY_STREAM_LOG(debug, "dubbo router: response complete", *callbacks_);
upstream_request_->onResponseComplete();
cleanup();
return;
} else if (status == DubboFilters::UpstreamResponseStatus::Reset) {
ENVOY_STREAM_LOG(debug, "dubbo router: upstream reset", *callbacks_);
upstream_request_->resetStream();
return;
}
}

DubboFilters::UpstreamResponseStatus
ActiveMessageDecoderFilter::upstreamData(Buffer::Instance& buffer) {
return parent_.upstreamData(buffer);
}

// parent_ -> ActiveMessage
ActiveResponseDecoder::ActiveResponseDecoder(ActiveMessage& parent, DubboFilterStats& stats, Network::Connection& connection ProtocolPtr&& protocol)

写入

从上面就看到我直接回跳到 ActiveMessageupstreamData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DubboFilters::UpstreamResponseStatus ActiveMessage::upstreamData(Buffer::Instance& buffer) {
try {
auto status = response_decoder_->onData(buffer);
if (status == DubboFilters::UpstreamResponseStatus::Complete) {
if (requestId() != response_decoder_->requestId()) {
throw EnvoyException(fmt::format("dubbo response: request ID is not equal, {}:{}", requestId(), response_decoder_->requestId()));
}
// Completed upstream response.
parent_.deferredMessage(*this);
} else if (status == DubboFilters::UpstreamResponseStatus::Retry) {
response_decoder_.reset();
}

return status;
}
}
// response_decoder_ = std::make_unique<ActiveResponseDecoder>(*this, parent_.stats(), parent_.connection(), std::move(protocol));

而当我们抵达到 onData 的时候,调用了本体上的 decoder 的逻辑,这里对于数据的解析使用了统一的逻辑。

onDatagithub
1
2
3
4
5
6
7
DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData(Buffer::Instance& data) {
bool underflow = false;
decoder_->onData(data, underflow);
ASSERT(complete_ || underflow);

return response_status_;
}

完成一切

当我们完成协议数据等待的完成,就再次进入我们熟悉的 onStreamDecoded 部分

ActiveResponseDecoder::onStreamDecodedgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void ActiveResponseDecoder::onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) {

metadata_ = metadata;
//同样的 Filter Chain 逻辑
if (applyMessageEncodedFilters(metadata, ctx) != FilterStatus::Continue) {
response_status_ = DubboFilters::UpstreamResponseStatus::Complete;
return;
}

// 在这里我们完成了所有的写入操作
response_connection_.write(ctx->originMessage(), false);
complete_ = true;
response_status_ = DubboFilters::UpstreamResponseStatus::Complete;

ENVOY_LOG(debug, "dubbo response: complete processing of upstream response messages, id is {}", metadata->requestId());
}

当我们完成这一切的时候,需要把 ActiveMessage 释放掉即可。

总结

当我们完成了下半部分的逻辑之后,我们又会得到这么一个全局的视角

最终

当我们将一切源码串起来的时候,我们就可知的,项目的逻辑是

我们能用 Rust 实现这一切吗?

答案是显然的,因为对于整个 dubbo filter 来说,最为核心的部分就是要把自己看做一个 network filter,而在 envoy 中,也作为 wasmabi 暴露出来了。

NetworkFilterlink
1
2
3
4
5
6
7
8
9
10
11
12
use envoy::extension::{NetworkFilter, Result};
use envoy::extension::filter::network::FilterStatus;
use envoy::host::log;

struct MyNetworkFilter;

impl NetworkFilter for MyNetworkFilter {
fn on_new_connection(&mut self) -> Result<FilterStatus> {
log::info!("a new connection has been established");
Ok(FilterStatus::Continue)
}
}

而在整个过程中,dubbo proxy 需要获得 cluster 的信息和获得连接池。

获得 cluster 信息可以通过 stream_info 接口获得。不过向下游发送 TCP 请求,在 hostcalls 并未可见。因此我们无法实现此功能。