Kitex 是如何工作

让我们来瞧瞧 Cloudwego 社区的 Kitex, 还没写完,先占位

快速文档

进阶教程

本文基于 kitex v0.9.0

从栈分析开始

在 Example 中,我们编写 Client 代码如下

api.go
1
2
3
4
resp, err := cli.GetItem(context.Background(), req, callopt.WithRPCTimeout(3*time.Second))
if err != nil {
log.Fatal(err)
}

Client

client 通过 Debug 模式进去,在返回之前,我们可以找到 Go Stack

1
2
3
4
5
6
7
8
9
10
client.rpcTimeoutMW.func1.1 (rpctimeout.go:119) github.com/cloudwego/kitex/client
client.(*kClient).Call (client.go:368) github.com/cloudwego/kitex/client
client.(*kcFinalizerClient).Call (client.go:89) github.com/cloudwego/kitex/client
itemservice.(*kClient).GetItem (itemservice.go:121) example_shop/kitex_gen/example/shop/item/itemservice
itemservice.(*kItemServiceClient).GetItem (client.go:48) example_shop/kitex_gen/example/shop/item/itemservice
main.main (main.go:31) main
runtime.main (proc.go:250) runtime
runtime.goexit (asm_amd64.s:1594) runtime
- Async Stack Trace
<autogenerated>:2

客户端发出请求是无法从栈里面获得所有信息的,这里涉及到一个切换工作,直接通过代码我们可以定位到最后在,这里涉及到 netpoll 的设计哲学,我们先 SKIP,对于客户端来说,大多数逻辑都是放在 next 里,这里的 netpoll 也就是帮助切换了上下文

Callgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request, response interface{}) error {
ri := rpcinfo.GetRPCInfo(ctx)
if ri.Config().InteractionMode() == rpcinfo.Streaming {
return next(ctx, request, response)
}

tm := ri.Config().RPCTimeout()
if tm > 0 {
tm += moreTimeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tm)
defer cancel()
}
// Fast path for ctx without timeout
if ctx.Done() == nil {
return next(ctx, request, response)
}

var err error
start := time.Now()
done := make(chan error, 1)
workerPool.GoCtx(ctx, func() {
defer func() {
// skip
}()
err = next(ctx, request, response)
if err != nil && ctx.Err() != nil &&
// skip
}
})

select {
case panicErr := <-done:
if panicErr != nil {
return panicErr
}
return err
case <-ctx.Done():
return makeTimeoutErr(ctx, start, tm)
}
}
}

主要的逻辑都在 contextMW 之中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
runtime.gopark (proc.go:364) runtime
runtime.selectgo (select.go:328) runtime
netpoll.(*connection).waitReadWithTimeout (connection_impl.go:443) github.com/cloudwego/netpoll
netpoll.(*connection).waitRead (connection_impl.go:404) github.com/cloudwego/netpoll
netpoll.(*connection).Peek (connection_impl.go:110) github.com/cloudwego/netpoll
netpoll.(*netpollByteBuffer).Peek (bytebuf.go:108) github.com/cloudwego/kitex/pkg/remote/trans/netpoll
codec.(*defaultCodec).DecodeMeta (default_codec.go:161) github.com/cloudwego/kitex/pkg/remote/codec
codec.(*defaultCodec).Decode (default_codec.go:220) github.com/cloudwego/kitex/pkg/remote/codec
trans.(*cliTransHandler).Read (default_client_handler.go:76) github.com/cloudwego/kitex/pkg/remote/trans
remote.(*TransPipeline).Read (trans_pipeline.go:136) github.com/cloudwego/kitex/pkg/remote
remotecli.(*client).Recv (client.go:100) github.com/cloudwego/kitex/pkg/remote/remotecli
client.(*kClient).invokeHandleEndpoint.func1 (client.go:490) github.com/cloudwego/kitex/client
client.newIOErrorHandleMW.func1.1 (middlewares.go:153) github.com/cloudwego/kitex/client
client.newResolveMWBuilder.func1.1.1 (middlewares.go:125) github.com/cloudwego/kitex/client
client.contextMW.func1 (context_middleware.go:54) github.com/cloudwego/kitex/client
client.rpcTimeoutMW.func1.1.1 (rpctimeout.go:131) github.com/cloudwego/kitex/client
wpool.(*Pool).GoCtx.func1 (pool.go:88) github.com/cloudwego/kitex/internal/wpool
runtime.goexit (asm_amd64.s:1594) runtime
- Async Stack Trace
wpool.(*Pool).GoCtx (pool.go:79) github.com/cloudwego/kitex/internal/wpool

Outbound

主要的逻辑让我们从这里开始

contextMWgithub
1
2
3
4
5
6
7
8
9
func contextMW(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
mw := getContextMiddleware(ctx)
if mw != nil {
return mw(next)(ctx, req, resp)
}
return next(ctx, req, resp)
}
}

这个逻辑就是非常标准的 Chain 模式,从调用栈里面就很明显的发现了调用的次序
timeout -> resolve -> ioError -> invokeHandle

服务发现 resolve

服务发现流程就是通过获取调用的 RPC 信息进行寻址。

newResolveMWBuildergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
rpcInfo := rpcinfo.GetRPCInfo(ctx)
dest := rpcInfo.To()
if dest == nil {
return kerrors.ErrNoDestService
}

remote := remoteinfo.AsRemoteInfo(dest)
if remote == nil {
err := fmt.Errorf("unsupported target EndpointInfo type: %T", dest)
return kerrors.ErrInternalException.WithCause(err)
}
if remote.GetInstance() != nil {
return next(ctx, request, response)
}
// 服务发现集成点
lb, err := lbf.Get(ctx, dest)
if err != nil {
return kerrors.ErrServiceDiscovery.WithCause(err)
}

// 重试机制
var lastErr error
for i := 0; i < maxRetry; i++ {
select {
case <-ctx.Done():
return kerrors.ErrRPCTimeout
default:
}

// 客服端的 LB 策略执行
picker := lb.GetPicker()
ins := picker.Next(ctx, request)
if ins == nil {
err = kerrors.ErrNoMoreInstance.WithCause(fmt.Errorf("last error: %w", lastErr))
} else {
remote.SetInstance(ins)
// TODO: generalize retry strategy
err = next(ctx, request, response)
}
if r, ok := picker.(internal.Reusable); ok {
r.Recycle()
}
if err == nil {
return nil
}
if retryable(err) {
lastErr = err
klog.CtxWarnf(ctx, "KITEX: auto retry retryable error, retry=%d error=%s", i+1, err.Error())
continue
}
return err
}
return lastErr

对接服务发现的重点在 lb, err := lbf.Get(ctx, dest)

GETgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (b *BalancerFactory) Get(ctx context.Context, target rpcinfo.EndpointInfo) (*Balancer, error) {
// 解析 Target
desc := b.resolver.Target(ctx, target)

// 从缓存读取
val, ok := b.cache.Load(desc)
if ok {
return val.(*Balancer), nil
}
val, err, _ := b.sfg.Do(desc, func() (interface{}, error) {

// 解析地址
res, err := b.resolver.Resolve(ctx, desc)
if err != nil {
return nil, err
}
renameResultCacheKey(&res, b.resolver.Name())
bl := &Balancer{
b: b,
target: desc,
}
bl.res.Store(res)
bl.sharedTicker = getSharedTicker(bl, b.opts.RefreshInterval)
b.cache.Store(desc, bl)
return bl, nil
})
if err != nil {
return nil, err
}
return val.(*Balancer), nil
}

扩展点

服务发现扩展

客户端通过服务发现扩展可以对接多套不同服务发现体系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
opt := client.WithResolver(YOUR_RESOLVER)

// new client
xxx.NewClient("destServiceName", opt)
}


// 服务发现接口定义
type Resolver interface {
Target(ctx context.Context, target rpcinfo.EndpointInfo) string
Resolve(ctx context.Context, key string) (Result, error)
Diff(key string, prev, next Result) (Change, bool)
Name() string
}

type Result struct {
Cacheable bool // 是否可以缓存
CacheKey string // 缓存的唯一 key
Instances []Instance // 服务发现结果
}

// diff 的结果
type Change struct {
Result Result
Added []Instance
Updated []Instance
Removed []Instance
}

这里的扩展点,显然就是在我们刚刚看的 GET 里面进行了调用

Server

我们就直接直接和服务器进行通讯,那我们直接在服务端相对应的问题设置下BreakPoint

item-handler.go
1
2
3
4
5
6
7
8
9
func (s *ItemServiceImpl) GetItem(ctx context.Context, req *item.GetItemReq) (resp *item.GetItemResp, err error) {
// set breakpoint
resp = item.NewGetItemResp()
resp.Item = item.NewItem()
resp.Item.Id = req.GetId()
resp.Item.Title = "Kitex"
resp.Item.Description = "Kitex is an excellent framework!"
return
}

那我们显然很容易的获得栈信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
main.(*ItemServiceImpl).GetItem (handler.go:13) main
itemservice.getItemHandler (itemservice.go:92) example_shop/kitex_gen/example/shop/item/itemservice
server.(*server).invokeHandleEndpoint.func1 (server.go:344) github.com/cloudwego/kitex/server
trans.(*svrTransHandler).OnMessage (default_server_handler.go:226) github.com/cloudwego/kitex/pkg/remote/trans
detection.(*svrTransHandler).OnMessage (server_handler.go:130) github.com/cloudwego/kitex/pkg/remote/trans/detection
remote.(*TransPipeline).OnMessage (trans_pipeline.go:156) github.com/cloudwego/kitex/pkg/remote
trans.(*svrTransHandler).OnRead (default_server_handler.go:201) github.com/cloudwego/kitex/pkg/remote/trans
detection.(*svrTransHandler).OnRead (server_handler.go:114) github.com/cloudwego/kitex/pkg/remote/trans/detection
remote.(*TransPipeline).OnRead (trans_pipeline.go:129) github.com/cloudwego/kitex/pkg/remote
netpoll.(*transServer).onConnRead (trans_server.go:157) github.com/cloudwego/kitex/pkg/remote/trans/netpoll
<autogenerated>:2
netpoll.(*connection).onRequest.func2 (connection_onevent.go:220) github.com/cloudwego/netpoll
netpoll.(*connection).onProcess.func1 (connection_onevent.go:255) github.com/cloudwego/netpoll
gopool.(*worker).run.func1.1 (worker.go:69) github.com/bytedance/gopkg/util/gopool
gopool.(*worker).run.func1 (worker.go:70) github.com/bytedance/gopkg/util/gopool
runtime.goexit (asm_amd64.s:1594) runtime
- Async Stack Trace
gopool.(*worker).run (worker.go:41) github.com/bytedance/gopkg/util/gopool

显然从栈上面,我们可以看出来分为

  • 通讯链路层(netpoll)
  • RPC框架层
  • 上层业务

本次底层的通讯框架并不在我们的讨论范围内,我们从 OnRead 函数开始阅读。

Inbound

remote.(*TransPipeline).OnRead (trans_pipeline.go:129) github.com/cloudwego/kitex/pkg/remote 中,我们可以看到如下代码

OnReadgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
func (p *TransPipeline) OnRead(ctx context.Context, conn net.Conn) error {
var err error
for _, h := range p.inboundHdrls {
ctx, err = h.OnRead(ctx, conn)
if err != nil {
return err
}
}
if netHdlr, ok := p.netHdlr.(ServerTransHandler); ok {
return netHdlr.OnRead(ctx, conn)
}
return nil
}

很明显代码分为两个部分

  • p.inboundHdrls 进行了二进制流的 Filter 操作
  • netHdlr.OnRead(ctx, conn) 继续 netHdlr 的 OnRead 操作

这里可以看出来 inboundHdrls 就是预留了一个扩展点,进行inbound流量的过滤,变形的操作。

svrTransHandler

从 stack 往上走一层就到了下面代码,这里用了非常标准的一个 Facade Pattern模式(多态),这里注册了多个实现 OnRead 的结构体,在 ProtocolMatch 进行匹配处理。

OnReadgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
// only need detect once when connection is reused
r := ctx.Value(handlerKey{}).(*handlerWrapper)
if r.handler != nil {
return r.handler.OnRead(r.ctx, conn)
}
// compare preface one by one
var which remote.ServerTransHandler
for i := range t.registered {
if t.registered[i].ProtocolMatch(ctx, conn) == nil {
which = t.registered[i]
break
}
}
if which != nil {
ctx, err = which.OnActive(ctx, conn)
if err != nil {
return err
}
} else {
which = t.defaultHandler
}
r.ctx, r.handler = ctx, which
return which.OnRead(ctx, conn)
}

从 stack 里面我们就可以发现,默认情况下,我们走了 default_server_handler 进行处理。这里开始代码就变的没有那么易读了,下面的代码进行了一些无关紧要的阉割。

default_server_handler.OnReadgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// OnRead implements the remote.ServerTransHandler interface.
// The connection should be closed after returning error.
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
ctx, ri := t.newCtxWithRPCInfo(ctx, conn)
t.ext.SetReadTimeout(ctx, conn, ri.Config(), remote.Server)
var recvMsg remote.Message
var sendMsg remote.Message
closeConnOutsideIfErr := true
defer func() {
// 先忽略异常处理部分
}()

// 从这里可以看出将请求进行了一个封装,封装成一个 Message
recvMsg = remote.NewMessageWithNewer(t.targetSvcInfo, t.svcSearchMap, ri, remote.Call, remote.Server, t.opt.RefuseTrafficWithoutServiceName)
// 设置消息的解码方式,这里看出来支持多种编码方式
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
// 在 transPipe 进行 Read 处理,这里注意并不是业务逻辑的处理,在下面的 OnMessage 部分
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.OnError(ctx, err, conn)
return err
}

// 在这里进行了 OnMessage 的逻辑处理
//
svcInfo := recvMsg.ServiceInfo()
// heartbeat processing
// recvMsg.MessageType would be set to remote.Heartbeat in previous Read procedure
// if specified codec support heartbeat
if recvMsg.MessageType() == remote.Heartbeat {
sendMsg = remote.NewMessage(nil, svcInfo, ri, remote.Heartbeat, remote.Server)
} else {
// reply processing
var methodInfo serviceinfo.MethodInfo
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
// it won't be err, because the method has been checked in decode, err check here just do defensive inspection
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded
t.OnError(ctx, err, conn)
return err
}
if methodInfo.OneWay() {
sendMsg = remote.NewMessage(nil, svcInfo, ri, remote.Reply, remote.Server)
} else {
sendMsg = remote.NewMessage(methodInfo.NewResult(), svcInfo, ri, remote.Reply, remote.Server)
}

// 这里才是正确处理逻辑的部分,Read 和 OnMessage 是两种概念
// 前者只是读取Byte数据,反序列化,后者才会进行序列化之后的逻辑处理
ctx, err = t.transPipe.OnMessage(ctx, recvMsg, sendMsg)
if err != nil {
// error cannot be wrapped to print here, so it must exec before NewTransError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
closeConnOutsideIfErr = false
return
}
}

// 处理完成之后就可以回填消息给调用方
remote.FillSendMsgFromRecvMsg(recvMsg, sendMsg)
if ctx, err = t.transPipe.Write(ctx, conn, sendMsg); err != nil {
t.OnError(ctx, err, conn)
return err
}
return
}

OnMessage 中使用了和 OnRead 相同的模式,最终会匹配到 default_server_handlerOnMessage 函数

Method Route

在消息解析完成之后,我们显然需要根据消息来确定我们要调用谁,这里就看到了 invokeHandleEndpoint,这里非常的标准的 RPC 匹配方式

invokeHandleEndpointgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (s *server) invokeHandleEndpoint() endpoint.Endpoint {
return func(ctx context.Context, args, resp interface{}) (err error) {
ri := rpcinfo.GetRPCInfo(ctx)
// 从 RPC 请求里面获得基本信息 methodName,serviceName
methodName := ri.Invocation().MethodName()
serviceName := ri.Invocation().ServiceName()
svc := s.svcs.svcMap[serviceName]
svcInfo := svc.svcInfo
if methodName == "" && svcInfo.ServiceName != serviceinfo.GenericService {
return errors.New("method name is empty in rpcinfo, should not happen")
}
defer func() {
// skip
}()

// 找到函数的 Hanlder 实现
implHandlerFunc := svcInfo.MethodInfo(methodName).Handler()

// 调用函数即可
err = implHandlerFunc(ctx, svc.handler, args, resp)
if err != nil {
if bizErr, ok := kerrors.FromBizStatusError(err); ok {
if setter, ok := ri.Invocation().(rpcinfo.InvocationSetter); ok {
setter.SetBizStatusErr(bizErr)
return nil
}
}
err = kerrors.ErrBiz.WithCause(err)
}
return err
}
}

扩展点

对于一个框架来说,扩展点是最有价值的地方,我们看看 Kitex 的扩展点

Middleware
ExampleMiddleware
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

svr := itemservice.NewServer(new(ItemServiceImpl), server.WithMiddleware(ExampleMiddleware))

func ExampleMiddleware(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request, response interface{}) error {
if arg, ok := request.(utils.KitexArgs); ok {
if req := arg.GetFirstArgument().(*echo.Request); req != nil {
klog.Debugf("Request Message: %v", req.Message)
}
}
err := next(ctx, request, response)
if result, ok := response.(utils.KitexResult); ok {
if resp, ok := result.GetResult().(*echo.Response); ok {
klog.Debugf("Response Message: %v", resp.Message)
// result.SetSuccess(...) 可以用于替换自定义响应
// 注意:自定义响应类型应与该 method 的响应类型相同
}
}
return err
}
}

结合访问 stack 我们还是很容易发现这里的设计,在初始化的过程中,给 Server 组装了一个 FilterChain 将所有的 func(ctx context.Context, request, response interface{}) error 函数都放进去了。调用的时候就按照顺序,挨个执行一遍。

buildInvokeChaingithub
1
2
3
4
func (s *server) buildInvokeChain() {
innerHandlerEp := s.invokeHandleEndpoint()
s.eps = endpoint.Chain(s.mws...)(innerHandlerEp)
}

不过根据执行点,我们显然可以知道,这里执行在 OnMessage 层,也就是在反序列化之后的地方才可以执行这些逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
main.ExampleMiddleware.func1 (main.go:26) main
trans.(*svrTransHandler).OnMessage (default_server_handler.go:226) github.com/cloudwego/kitex/pkg/remote/trans
detection.(*svrTransHandler).OnMessage (server_handler.go:130) github.com/cloudwego/kitex/pkg/remote/trans/detection
remote.(*TransPipeline).OnMessage (trans_pipeline.go:156) github.com/cloudwego/kitex/pkg/remote
trans.(*svrTransHandler).OnRead (default_server_handler.go:201) github.com/cloudwego/kitex/pkg/remote/trans
detection.(*svrTransHandler).OnRead (server_handler.go:114) github.com/cloudwego/kitex/pkg/remote/trans/detection
remote.(*TransPipeline).OnRead (trans_pipeline.go:129) github.com/cloudwego/kitex/pkg/remote
netpoll.(*transServer).onConnRead (trans_server.go:157) github.com/cloudwego/kitex/pkg/remote/trans/netpoll
<autogenerated>:2
netpoll.(*connection).onRequest.func2 (connection_onevent.go:220) github.com/cloudwego/netpoll
netpoll.(*connection).onProcess.func1 (connection_onevent.go:255) github.com/cloudwego/netpoll
gopool.(*worker).run.func1.1 (worker.go:69) github.com/bytedance/gopkg/util/gopool
gopool.(*worker).run.func1 (worker.go:70) github.com/bytedance/gopkg/util/gopool
runtime.goexit (asm_amd64.s:1594) runtime
- Async Stack Trace
gopool.(*worker).run (worker.go:41) github.com/bytedance/gopkg/util/gopool
服务注册扩展
1
svr := xxxservice.NewServer(handler, server.WithRegistry(yourRegistry), server.WithRegistryInfo(yourRegistryInfo))

预留了一个注册中心的接口

1
2
3
4
5
// Registry is extension interface of service registry.
type Registry interface {
Register(info *Info) error
Deregister(info *Info) error
}

这里的调用过程也比较简单,并不像 Spring 体系提供那么多的调用点,在启动之后,通过短暂的延时,调用预留的注册接口。

waitExitgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *server) waitExit(errCh chan error) error {
exitSignal := s.opt.ExitSignal()

// service may not be available as soon as startup.
delayRegister := time.After(1 * time.Second)
for {
select {
case err := <-exitSignal:
return err
case err := <-errCh:
return err
case <-delayRegister:
s.Lock()
if err := s.opt.Registry.Register(s.opt.RegistryInfo); err != nil {
s.Unlock()
return err
}
s.Unlock()
}
}
}
编解码扩展

Kitex 支持扩展协议,包括整体的 Codec 和 PayloadCodec。通常 RPC 协议中包含应用层传输协议和 Payload 协议,如 HTTP/HTTP2 属于应用层传输协议,基于 HTTP/HTTP2 可以承载不同格式和不同协议的 Payload。