今天就让我们一起来品一品,Envoy
中的 Dubbo
协议是如何实现的,基于 1.18
分支
Intro
Dubbo Filter
位于 /*/extensions/filters/network/dubbo_proxy
中,首先让我们先瞧一瞧,这个 Filter
是如何使用的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static_resources: listeners: - address: socket_address: address: 0.0 .0 .0 port_value: 20881 filter_chains: - filters: - name: envoy.filters.network.dubbo_proxy typed_config: "@type" : type.googleapis.com/envoy.extensions.filters.network.dubbo_proxy.v3.DubboProxy stat_prefix: dubbo_incomming_stats protocol_type: Dubbo serialization_type: Hessian2 route_config: name: local_route interface: org.apache.dubbo.demo.DemoService routes: - match: method: name: exact: sayHello route: cluster: user_service_dubbo_server dubbo_filters: - name: envoy.filters.dubbo.testFilter typed_config: "@type" : type.googleapis.com/google.protobuf.Struct value: name: test_service - name: envoy.filters.dubbo.router
可见 dubbo
的插件是以 filter
的形式,通过 LDS
协议进行分发。
dubbo proxy 目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 ├── BUILD ├── active_message.cc ├── active_message.h ├── app_exception.cc ├── app_exception.h ├── config.cc ├── config.h ├── conn_manager.cc ├── conn_manager.h ├── decoder.cc ├── decoder.h ├── decoder_event_handler.h ├── dubbo_hessian2_serializer_impl.cc ├── dubbo_hessian2_serializer_impl.h ├── dubbo_protocol_impl.cc ├── dubbo_protocol_impl.h ├── filters │ ├── BUILD │ ├── factory_base.h │ ├── filter.h │ ├── filter_config.h │ └── well_known_names.h ├── heartbeat_response.cc ├── heartbeat_response.h ├── hessian_utils.cc ├── hessian_utils.h ├── message.h ├── message_impl.cc ├── message_impl.h ├── metadata.h ├── protocol.h ├── protocol_constants.h ├── router │ ├── BUILD │ ├── config.cc │ ├── config.h │ ├── route.h │ ├── route_matcher.cc │ ├── route_matcher.h │ ├── router.h │ ├── router_impl.cc │ └── router_impl.h ├── serializer.h └── stats.h
插件加载
在 Envoy Extend 中也聊过这部分内容,简而言之就是在 ListenerImpl
中,初始化 FilterChain
的时候是可以基于注册机制完成的
ListenerImpl::createNetworkFilterChain github 1 2 3 4 5 bool ListenerImpl::createNetworkFilterChain ( Network::Connection& connection, const std::vector<Network::FilterFactoryCb>& filter_factories) { return Configuration::FilterChainUtility::buildFilterChain (connection, filter_factories); }
从机制上,我们需要注册 factory
到 envoy
的上下文中,因此在 dubbo_proxy/config.cc
进行了注册
REGISTER_FACTORY github 1 REGISTER_FACTORY (DubboProxyFilterConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory);
插件初始化
注册到 Envoy
之后,在系统启动的时候,系统就会回调我们构建我们的 Factory
,继而又由 Factory
构建出我们的 Filter
而这部分的逻辑处于
createFilterFactoryFromProtoTyped github 1 2 3 4 5 6 7 8 9 10 Network::FilterFactoryCb DubboProxyFilterConfigFactory::createFilterFactoryFromProtoTyped ( const envoy::extensions::filters::network::dubbo_proxy::v3::DubboProxy& proto_config, Server::Configuration::FactoryContext& context) { std::shared_ptr<Config> filter_config (std::make_shared<ConfigImpl>(proto_config, context)) ; return [filter_config, &context](Network::FilterManager& filter_manager) -> void { filter_manager.addReadFilter (std::make_shared <ConnectionManager>( *filter_config, context.api ().randomGenerator (), context.dispatcher ().timeSource ())); }; }
当我们构建完成系统之后,就来到了如何处理数据量的地方了。我们从上文中已经知道,我们在 filter_manager
中构建了 ReadFilter
为我们的 dubbo
协议, 而这个 ReadFilter
实际上就是 ConnectionManager
。
小声比比
ConnectionManager 的组件名真的有点大,这里的 `ConnectionManager` 和系统内置的并不是相同的(看名字很容易搞错)。
conn_manager.h github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class ConnectionManager : public Network::ReadFilter, public Network::ConnectionCallbacks, public RequestDecoderCallbacks, Logger::Loggable<Logger::Id::dubbo> { public : using ConfigProtocolType = envoy::extensions::filters::network::dubbo_proxy::v3::ProtocolType; using ConfigSerializationType = envoy::extensions::filters::network::dubbo_proxy::v3::SerializationType; ConnectionManager (Config& config, Random::RandomGenerator& random_generator, TimeSource& time_system); ~ConnectionManager () override = default ; Network::FilterStatus onData (Buffer::Instance& data, bool end_stream) override ; Network::FilterStatus onNewConnection () override ; void initializeReadFilterCallbacks (Network::ReadFilterCallbacks&) override ; StreamHandler& newStream () override ; std::list<ActiveMessagePtr>& getActiveMessagesForTest () { return active_message_list_; } }; } } } }
从函数签名上,我们即可知道,这个 ConnectionManager
是 ConnectionCallbacks
& ReadFilter
& RequestDecoderCallbacks
三合一了。
此时系统很清晰的可以了解到是这么工作的
全局抽象
对于 Dubbo Proxy
来说,代码中抽象一些比较重要的概念
ConnectionManager
: 请求逻辑的主入口
Decoder
: 数据体反序列的实现,分 ResponseDecoder
和 RequestDecoder
Protocol
: Dubbo 协议的实现部分(仅解析 Dubbo 自定义的 Header 部分)
ActiveMessage
: 一次 Dubbo
请求所对应的上下文,将其他的抽象串联起来
Router
: 路由转发匹配
FilterChain
: Dubbo 内部依然提供了额外构建的的 Filter Chain
数据处理 [TOP HALF]
读数据处理
读数据很明显从 onData
作为入口。
onData github 1 2 3 4 5 6 7 8 9 10 11 Network::FilterStatus ConnectionManager::onData (Buffer::Instance& data, bool end_stream) { ENVOY_LOG (trace, "dubbo: read {} bytes" , data.length ()); request_buffer_.move (data); dispatch (); if (end_stream) { } return Network::FilterStatus::StopIteration; }
对于数据获取到第一时间就直接进行了 dispatch()
dispatch github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 void ConnectionManager::dispatch () { try { bool underflow = false ; while (!underflow) { decoder_->onData (request_buffer_, underflow); } return ; } catch (const EnvoyException& ex) { ENVOY_CONN_LOG (error, "dubbo error: {}" , read_callbacks_->connection (), ex.what ()); read_callbacks_->connection ().close (Network::ConnectionCloseType::NoFlush); stats_.request_decoding_error_.inc (); } resetAllMessages (true ); }
首先我们先经过内部的 decoder_
进行序列化。
onData github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 FilterStatus DecoderBase::onData (Buffer::Instance& data, bool & buffer_underflow) { ENVOY_LOG (debug, "dubbo decoder: {} bytes available" , data.length ()); buffer_underflow = false ; if (!decode_started_) { start (); } ASSERT (state_machine_ != nullptr ); ENVOY_LOG (debug, "dubbo decoder: protocol {}, state {}, {} bytes available" , protocol_.name (), ProtocolStateNameValues::name (state_machine_->currentState ()), data.length ()); ProtocolState rv = state_machine_->run (data); switch (rv) { case ProtocolState::WaitForData: ENVOY_LOG (debug, "dubbo decoder: wait for data" ); buffer_underflow = true ; return FilterStatus::Continue; default : break ; } ASSERT (rv == ProtocolState::Done); complete (); buffer_underflow = (data.length () == 0 ); ENVOY_LOG (debug, "dubbo decoder: data length {}" , data.length ()); return FilterStatus::Continue; }
这里 dubbo
抽象了一个 状态机
来解决问题,大致上也就是需要等待所有的数据收集到之后我们继续向下处理,因此当我们将数据收集完整之后,我们继续处理。
序列化处理
继续深入系列化的部分
decode github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ProtocolState DecoderStateMachine::run (Buffer::Instance& buffer) { while (state_ != ProtocolState::Done) { DecoderStatus s = handleState (buffer); if (s.next_state_ == ProtocolState::WaitForData) { return ProtocolState::WaitForData; } state_ = s.next_state_; } return state_; } DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState (Buffer::Instance& buffer) { switch (state_) { case ProtocolState::OnDecodeStreamHeader: return onDecodeStreamHeader (buffer); case ProtocolState::OnDecodeStreamData: return onDecodeStreamData (buffer); default : NOT_REACHED_GCOVR_EXCL_LINE; } }
对于接收到的数据,我们进行处理,对于 Stream
类型的数据,分为了 Header
和 Data
部分的处理,大致上是一样的,我们去 Header 的逻辑中一窥究竟。对于大部分的协议而言,Header
都是我们的元数据部分,包含了 Len
Type
之类的字段,一般都是先处理 Header
onDecodeStreamHeader 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 DecoderStateMachine::DecoderStatus DecoderStateMachine::onDecodeStreamHeader (Buffer::Instance& buffer) { auto metadata = std::make_shared <MessageMetadata>(); auto ret = protocol_.decodeHeader (buffer, metadata); if (!ret.second) { ENVOY_LOG (debug, "dubbo decoder: need more data for {} protocol" , protocol_.name ()); return {ProtocolState::WaitForData}; } auto context = ret.first; if (metadata->messageType () == MessageType::HeartbeatRequest || metadata->messageType () == MessageType::HeartbeatResponse) { } active_stream_ = delegate_.newStream (metadata, context); context->originMessage ().move (buffer, context->headerSize ()); return {ProtocolState::OnDecodeStreamData}; }
而委托者是谁,就是我们解答此代码逻辑的核心,我们在 decoder.h
中会发现 delegate_
的原型是一个 Decoder
1 2 3 4 ActiveStream* newStream (MessageMetadataSharedPtr metadata, ContextSharedPtr context) override { stream_ = std::make_unique <ActiveStream>(callbacks_.newStream (), metadata, context); return stream_.get (); }
可见最为重要的是 callbacks_
如何 newStream()
的,这部分的逻辑在
ConnectionManager.newSteam() link 1 2 3 4 5 6 7 8 StreamHandler& ConnectionManager::newStream () { ActiveMessagePtr new_message (std::make_unique<ActiveMessage>(*this )) ; new_message->createFilterChain (); LinkedList::moveIntoList (std::move (new_message), active_message_list_); return **active_message_list_.begin (); }
小声比比
走到这里,我们又看见了我们熟悉的 FilterChain,我们可以把整个系统看作一个大 FilterChain,而在 Dubbo Proxy 包含了一个子 FilterChain
而当我们继续处理 Body
部分的时候
onDecodeStreamData github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 DecoderStateMachine::DecoderStatus DecoderStateMachine::onDecodeStreamData (Buffer::Instance& buffer) { if (!protocol_.decodeData (buffer, active_stream_->context_, active_stream_->metadata_)) { return {ProtocolState::WaitForData}; } active_stream_->context_->originMessage ().move (buffer, active_stream_->context_->bodySize ()); active_stream_->onStreamDecoded (); active_stream_ = nullptr ; return {ProtocolState::Done}; } void ActiveMessage::onStreamDecoded (MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) { metadata_ = metadata; context_ = ctx; filter_action_ = [metadata, ctx](DubboFilters::DecoderFilter* filter) -> FilterStatus { return filter->onMessageDecoded (metadata, ctx); }; auto status = applyDecoderFilters (nullptr , FilterIterationStartState::CanStartFromCurrent); if (status == FilterStatus::StopIteration) { pending_stream_decoded_ = true ; return ; } finalizeRequest (); }
applyDecoderFilters 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 FilterStatus ActiveMessage::applyDecoderFilters (ActiveMessageDecoderFilter* filter, FilterIterationStartState state) { if (!local_response_sent_) { for (auto entry = commonDecodePrefix (filter, state); entry != decoder_filters_.end (); entry++) { const FilterStatus status = filter_action_ ((*entry)->handler ().get ()); if (local_response_sent_) { break ; } if (status != FilterStatus::Continue) { return status; } } } filter_action_ = nullptr ; return FilterStatus::Continue; }
显然进入的逻辑就是一个比较正常的 filter chain
了,走过这些漫长曲折的道路,我们来到了,真正的逻辑部分。也就是这些 filter
是什么,不过这里显然是配置化的,在 example
也如此,因此在此处我们不做深入,逻辑和之前的 filterChain
类似。
Dubbo Filter Route
我们在上面 return filter->onMessageDecoded(metadata, ctx);
中,当我们完成了所有的 MessageDcoded 就会进入如下逻辑,显然进入了我们所能见的 route
阶段。
onMessageDecoded github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 FilterStatus Router::onMessageDecoded (MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) { ASSERT (metadata->hasInvocationInfo ()); const auto & invocation = metadata->invocationInfo (); route_ = callbacks_->route (); if (!route_) { return FilterStatus::StopIteration; } route_entry_ = route_->routeEntry (); Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster (route_entry_->clusterName ()); if (!cluster) { } cluster_ = cluster->info (); Tcp::ConnectionPool::Instance* conn_pool = cluster->tcpConnPool (Upstream::ResourcePriority::Default, this ); const auto * invocation_impl = dynamic_cast <const RpcInvocationImpl*>(&invocation); if (invocation_impl->hasAttachment () && invocation_impl->attachment ().attachmentUpdated ()) { } else { upstream_request_buffer_.move (ctx->originMessage (), ctx->messageSize ()); } upstream_request_ = std::make_unique <UpstreamRequest>( *this , *conn_pool, metadata, callbacks_->serializationType (), callbacks_->protocolType ()); return upstream_request_->start (); }
转发的本体逻辑并不复杂,主要的还是在 route
部分,我们来看看。
Route
matches github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 RouteConstSharedPtr MethodRouteEntryImpl::matches (const MessageMetadata& metadata, uint64_t random_value) const { const auto invocation = dynamic_cast <const RpcInvocationImpl*>(&metadata.invocationInfo ()); if (!RouteEntryImplBase::headersMatch (*invocation)) { return nullptr ; } if (!method_name_.match (invocation->methodName ())) { return nullptr ; } if (parameter_route_) { return parameter_route_->matches (metadata, random_value); } return clusterEntry (random_value); }
路由部分就是标准的一个从上往下执行的过程。
总结
当我们完成了上半部分的逻辑之后,我们会得到这么一个全局的视角
数据处理 [BOTTOM HALF]
写入回调
显然当我们 start
写入之后,当服务端回复了我们的请求,我们需要知道在哪里处理,而这一切在构建的时候就已经确定了。
start 1 2 3 4 5 6 7 8 9 10 FilterStatus Router::UpstreamRequest::start () { Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection (*this ); if (handle) { conn_pool_handle_ = handle; return FilterStatus::StopIteration; } return FilterStatus::Continue; }
因此,完成请求之后的回调是进入
onPoolReady github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void Router::UpstreamRequest::onPoolReady (Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, Upstream::HostDescriptionConstSharedPtr host) { onUpstreamHostSelected (host); conn_data_ = std::move (conn_data); conn_data_->addUpstreamCallbacks (parent_); onRequestStart (continue_decoding); encodeData (parent_.upstream_request_buffer_); } void Router::onUpstreamData (Buffer::Instance& data, bool end_stream) { if (!upstream_request_->response_started_) { callbacks_->startUpstreamResponse (); upstream_request_->response_started_ = true ; } DubboFilters::UpstreamResponseStatus status = callbacks_->upstreamData (data); if (status == DubboFilters::UpstreamResponseStatus::Complete) { ENVOY_STREAM_LOG (debug, "dubbo router: response complete" , *callbacks_); upstream_request_->onResponseComplete (); cleanup (); return ; } else if (status == DubboFilters::UpstreamResponseStatus::Reset) { ENVOY_STREAM_LOG (debug, "dubbo router: upstream reset" , *callbacks_); upstream_request_->resetStream (); return ; } } DubboFilters::UpstreamResponseStatus ActiveMessageDecoderFilter::upstreamData (Buffer::Instance& buffer) { return parent_.upstreamData (buffer); } ActiveResponseDecoder::ActiveResponseDecoder (ActiveMessage& parent, DubboFilterStats& stats, Network::Connection& connection ProtocolPtr&& protocol)
写入
从上面就看到我直接回跳到 ActiveMessage
中 upstreamData
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 DubboFilters::UpstreamResponseStatus ActiveMessage::upstreamData (Buffer::Instance& buffer) { try { auto status = response_decoder_->onData (buffer); if (status == DubboFilters::UpstreamResponseStatus::Complete) { if (requestId () != response_decoder_->requestId ()) { throw EnvoyException (fmt::format("dubbo response: request ID is not equal, {}:{}" , requestId (), response_decoder_->requestId ())); } parent_.deferredMessage (*this ); } else if (status == DubboFilters::UpstreamResponseStatus::Retry) { response_decoder_.reset (); } return status; } }
而当我们抵达到 onData
的时候,调用了本体上的 decoder
的逻辑,这里对于数据的解析使用了统一的逻辑。
onData github 1 2 3 4 5 6 7 DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData (Buffer::Instance& data) { bool underflow = false ; decoder_->onData (data, underflow); ASSERT (complete_ || underflow); return response_status_; }
完成一切
当我们完成协议数据等待的完成,就再次进入我们熟悉的 onStreamDecoded
部分
ActiveResponseDecoder::onStreamDecoded github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void ActiveResponseDecoder::onStreamDecoded (MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) { metadata_ = metadata; if (applyMessageEncodedFilters (metadata, ctx) != FilterStatus::Continue) { response_status_ = DubboFilters::UpstreamResponseStatus::Complete; return ; } response_connection_.write (ctx->originMessage (), false ); complete_ = true ; response_status_ = DubboFilters::UpstreamResponseStatus::Complete; ENVOY_LOG (debug, "dubbo response: complete processing of upstream response messages, id is {}" , metadata->requestId ()); }
当我们完成这一切的时候,需要把 ActiveMessage
释放掉即可。
总结
当我们完成了下半部分的逻辑之后,我们又会得到这么一个全局的视角
最终
当我们将一切源码串起来的时候,我们就可知的,项目的逻辑是
我们能用 Rust 实现这一切吗?
答案是显然的,因为对于整个 dubbo filter
来说,最为核心的部分就是要把自己看做一个 network filter
,而在 envoy
中,也作为 wasm
的 abi
暴露出来了。
NetworkFilter link 1 2 3 4 5 6 7 8 9 10 11 12 use envoy::extension::{NetworkFilter, Result };use envoy::extension::filter::network::FilterStatus;use envoy::host::log;struct MyNetworkFilter ;impl NetworkFilter for MyNetworkFilter { fn on_new_connection (&mut self ) -> Result <FilterStatus> { log::info!("a new connection has been established" ); Ok (FilterStatus::Continue) } }
而在整个过程中,dubbo proxy
需要获得 cluster
的信息和获得连接池。
获得 cluster
信息可以通过 stream_info 接口获得。不过向下游发送 TCP
请求,在 hostcalls 并未可见。因此我们无法实现此功能。