Kitex 是如何工作
让我们来瞧瞧 Cloudwego 社区的 Kitex, 还没写完,先占位
快速文档
从栈分析开始
在 Example 中,我们编写 Client 代码如下
1 | resp, err := cli.GetItem(context.Background(), req, callopt.WithRPCTimeout(3*time.Second)) |
Client
在 client
通过 Debug
模式进去,在返回之前,我们可以找到 Go Stack
1 | client.rpcTimeoutMW.func1.1 (rpctimeout.go:119) github.com/cloudwego/kitex/client |
客户端发出请求是无法从栈里面获得所有信息的,这里涉及到一个切换工作,直接通过代码我们可以定位到最后在,这里涉及到 netpoll
的设计哲学,我们先 SKIP,对于客户端来说,大多数逻辑都是放在 next
里,这里的 netpoll
也就是帮助切换了上下文
1 | return func(next endpoint.Endpoint) endpoint.Endpoint { |
主要的逻辑都在 contextMW
之中
1 | runtime.gopark (proc.go:364) runtime |
Outbound
主要的逻辑让我们从这里开始
1 | func contextMW(next endpoint.Endpoint) endpoint.Endpoint { |
这个逻辑就是非常标准的 Chain
模式,从调用栈里面就很明显的发现了调用的次序timeout
-> resolve
-> ioError
-> invokeHandle
服务发现 resolve
服务发现流程就是通过获取调用的 RPC
信息进行寻址。
1 | rpcInfo := rpcinfo.GetRPCInfo(ctx) |
对接服务发现的重点在 lb, err := lbf.Get(ctx, dest)
1 | func (b *BalancerFactory) Get(ctx context.Context, target rpcinfo.EndpointInfo) (*Balancer, error) { |
扩展点
服务发现扩展
客户端通过服务发现扩展可以对接多套不同服务发现体系。
1 | func main() { |
这里的扩展点,显然就是在我们刚刚看的 GET
里面进行了调用
Server
我们就直接直接和服务器进行通讯,那我们直接在服务端相对应的问题设置下BreakPoint
1 | func (s *ItemServiceImpl) GetItem(ctx context.Context, req *item.GetItemReq) (resp *item.GetItemResp, err error) { |
那我们显然很容易的获得栈信息
1 | main.(*ItemServiceImpl).GetItem (handler.go:13) main |
显然从栈上面,我们可以看出来分为
- 通讯链路层(netpoll)
- RPC框架层
- 上层业务
本次底层的通讯框架并不在我们的讨论范围内,我们从 OnRead 函数开始阅读。
Inbound
在 remote.(*TransPipeline).OnRead (trans_pipeline.go:129) github.com/cloudwego/kitex/pkg/remote
中,我们可以看到如下代码
1 | func (p *TransPipeline) OnRead(ctx context.Context, conn net.Conn) error { |
很明显代码分为两个部分
p.inboundHdrls
进行了二进制流的 Filter 操作netHdlr.OnRead(ctx, conn)
继续 netHdlr 的 OnRead 操作
这里可以看出来 inboundHdrls
就是预留了一个扩展点,进行inbound流量的过滤,变形的操作。
svrTransHandler
从 stack 往上走一层就到了下面代码,这里用了非常标准的一个 Facade Pattern
模式(多态),这里注册了多个实现 OnRead 的结构体,在 ProtocolMatch 进行匹配处理。
1 | func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) { |
从 stack 里面我们就可以发现,默认情况下,我们走了 default_server_handler 进行处理。这里开始代码就变的没有那么易读了,下面的代码进行了一些无关紧要的阉割。
1 | // OnRead implements the remote.ServerTransHandler interface. |
在 OnMessage
中使用了和 OnRead
相同的模式,最终会匹配到 default_server_handler
的 OnMessage
函数
Method Route
在消息解析完成之后,我们显然需要根据消息来确定我们要调用谁,这里就看到了 invokeHandleEndpoint
,这里非常的标准的 RPC
匹配方式
1 | func (s *server) invokeHandleEndpoint() endpoint.Endpoint { |
扩展点
对于一个框架来说,扩展点是最有价值的地方,我们看看 Kitex
的扩展点
Middleware
1 |
|
结合访问 stack
我们还是很容易发现这里的设计,在初始化的过程中,给 Server 组装了一个 FilterChain 将所有的 func(ctx context.Context, request, response interface{}) error
函数都放进去了。调用的时候就按照顺序,挨个执行一遍。
1 | func (s *server) buildInvokeChain() { |
不过根据执行点,我们显然可以知道,这里执行在 OnMessage
层,也就是在反序列化之后的地方才可以执行这些逻辑。
1 | main.ExampleMiddleware.func1 (main.go:26) main |
服务注册扩展
1 | svr := xxxservice.NewServer(handler, server.WithRegistry(yourRegistry), server.WithRegistryInfo(yourRegistryInfo)) |
预留了一个注册中心的接口
1 | // Registry is extension interface of service registry. |
这里的调用过程也比较简单,并不像 Spring
体系提供那么多的调用点,在启动之后,通过短暂的延时,调用预留的注册接口。
1 | func (s *server) waitExit(errCh chan error) error { |
编解码扩展
Kitex 支持扩展协议,包括整体的 Codec 和 PayloadCodec。通常 RPC 协议中包含应用层传输协议和 Payload 协议,如 HTTP/HTTP2 属于应用层传输协议,基于 HTTP/HTTP2 可以承载不同格式和不同协议的 Payload。