本文基于 0.15.0 版本,因为 Mosn 代码非常的巨大,对于 * 开头的章节作为补充部分,可以跳过。
MOSN 的 IO 模型
Mosn 支持两种网络模型 Netpoll / RawEpoll
Netpoll 模式
Netpoll 模式是标准的 Go 的网络模型, goroutine-per-connection 相对缺点就是消耗高点,但是编程会简单很多。
RawEpoll 模式
链接建立后,向 Epoll 注册 oneshot 可读事件监听;并且此时不允许有协程调用 conn.read,避免与 runtime netpoll 冲突。
可读事件到达,从 goroutine pool 挑选一个协程进行读事件处理;由于使用的是 oneshot 模式,该 fd 后续可读事件不会再触发。
请求处理过程中,协程调度与经典 netpoll 模式一致。
请求处理完成,将协程归还给协程池;同时将 fd 重新添加到 RawEpoll 中。
How it work
大家先瞧一眼模块图,瞧瞧就好了。
Start Server
代码的入口在 pkg/mosn/starter.go:300,我们采用文件的方式进行启动。
我们看到熟悉的对于 Listener 的构建,对于 xDS 协议来说, Listener 部分决定了对外的监听部分。
1 2 3 4 5 6 7 for idx, _ := range serverConfig.Listeners { lc := configmanager.ParseListenerConfig(&serverConfig.Listeners[idx], inheritListeners, inheritPacketConn) deprecatedRouter, err := configmanager.ParseRouterConfiguration(&lc.FilterChains[0 ]) if _, err := srv.AddListener(lc); err != nil { log.StartLogger.Fatalf("[mosn] [NewMosn] AddListener error:%s" , err.Error()) } }
对于熟悉 Linux 编程的同学, Listener 势必是创建一个 ServerSocket 进行监听。对于处理的逻辑,应该也是在构建的过程中创建出来的,我们跳至创建处:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (ch *connHandler) AddOrUpdateListener(lc *v2.Listener) (types.ListenerEventListener, error ) { var listenerName string if lc.Name == "" { listenerName = utils.GenerateUUID() lc.Name = listenerName } else { listenerName = lc.Name } if len (lc.FilterChains) != 1 { return nil , errors.New("error updating listener, listener have filter chains count is not 1" ) } var listenerFiltersFactories []api.ListenerFilterChainFactory var networkFiltersFactories []api.NetworkFilterChainFactory var streamFiltersFactories []api.StreamFilterChainFactory listenerFiltersFactories = configmanager.GetListenerFilters(lc.ListenerFilters) networkFiltersFactories = configmanager.GetNetworkFilters(&lc.FilterChains[0 ]) streamFiltersFactories = configmanager.GetStreamFilters(lc.StreamFilters) var al *activeListener if al = ch.findActiveListenerByName(listenerName); al != nil { } else { listenerStopChan := make (chan struct {}) ➊ l := network.NewListener(lc) var err error al, err = newActiveListener(l, lc, als, listenerFiltersFactories, networkFiltersFactories, streamFiltersFactories, ch, listenerStopChan) ➋ if err != nil { return al, err } l.SetListenerCallbacks(al) ch.listeners = append (ch.listeners, al) log.DefaultLogger.Infof("[server] [conn handler] [add listener] add listener: %s" , lc.Addr.String()) } admin.SetListenerConfig(listenerName, *al.listener.Config()) return al, nil }
这部分的代码显然不能满足我们对于核心,怎么 AC 请求的需求,这里是简单的创建了 Listener,让我们继续跟踪下去,不过值得注意的在 ➊ 创建了一个 StopChain,这是非常标准 Go 中如何通知他人 Close 的方式。不过从 ➋ 处,我们可以看到一些熟悉的 Filters 的单字,想必也是通过 FilterChains 进行工作的。
在构建的 newActiveListener 进行了一些监控的初始化。真正进行工作的代码是在 pkg/server/handler.go:StartListener
Start 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (l *listener) Start(lctx context.Context, restart bool ) { if l.bindToPort { ignore := func () bool { default : if l.rawl == nil && l.packetConn == nil { if err := l.listen(lctx); err != nil { log.StartLogger.Fatalf("[network] [listener start] [listen] %s listen failed, %v" , l.name, err) } } } l.state = ListenerRunning return false }() switch l.network { case "udp" : l.readMsgEventLoop(lctx) default : l.acceptEventLoop(lctx) } } }
在 Start 有两个重点 listen(lctx) 创建了监听器,在 l.acceptEventLoop(lctx) 进行接受的事件循环,listen(lctx) 的逻辑就比较简单就是标准的 Go Net,我们进入下个函数在 l.acceptEventLoop(lctx) 中
1 2 3 4 5 6 7 8 9 10 11 12 func (l *listener) acceptEventLoop(lctx context.Context) { for { if err := l.accept(lctx); err != nil { if nerr, ok := err.(net.Error); ok && nerr.Timeout() { log.DefaultLogger.Infof("[network] [listener start] [accept] listener %s stop accepting connections by deadline" , l.name) return } else if ope, ok := err.(*net.OpError); ok { } } } }
Accept Connection
直接只有 loop 循环,不断的去接受即可,标准的 Networking 编程。
accept 1 2 3 4 5 6 7 8 9 func (l *listener) accept(lctx context.Context) error { rawc, err := l.rawl.Accept() utils.GoWithRecover(func () { l.cb.OnAccept(rawc, l.useOriginalDst, nil , nil , nil ) }, nil ) return nil }
在 OnAccept 中就是我们线程模型的开始
OnAccept 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (al *activeListener) OnAccept(rawc net.Conn, useOriginalDst bool , oriRemoteAddr net.Addr, ch chan api.Connection, buf []byte ) { var rawf *os.File if !useOriginalDst { if network.UseNetpollMode { ➊ switch rawc.LocalAddr().Network() { case "udp" : if tc, ok := rawc.(*net.UDPConn); ok { rawf, _ = tc.File() } default : if tc, ok := rawc.(*net.TCPConn); ok { rawf, _ = tc.File() } } } } arc := newActiveRawConn(rawc, al) for _, lfcf := range al.listenerFiltersFactories { arc.acceptedFilters = append (arc.acceptedFilters, lfcf) ➋ } ctx := mosnctx.WithValue(context.Background(), types.ContextKeyListenerPort, al.listenPort) arc.ctx = ctx arc.ContinueFilterChain(ctx, true ) }
在 ➊ 处,我们就开始区分我们的 RawEpoll 和 NetPoll 的模式了。最重要的部分就是 ➋ 处理进行了 Listener 的 FilterChain 构建,我们继续往下探索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (arc *activeRawConn) ContinueFilterChain(ctx context.Context, success bool ) { if !success { return } for ; arc.acceptedFilterIndex < len (arc.acceptedFilters); arc.acceptedFilterIndex++ { filterStatus := arc.acceptedFilters[arc.acceptedFilterIndex].OnAccept(arc) ➊ if filterStatus == api.Stop { return } } arc.activeListener.newConnection(ctx, arc.rawc) }
从 ➊ 处,我们就发现了 FliterChain 从返回的对象也可以清晰的发现,这和 Servlet Filter 不同,每个 Filter 返回对象来决定是否需要继续下去。
在 newConnection 在中 处理下各种超时的设置,我们勇敢的往前进,后续的 OnNewConnection 有我们的大头戏了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (al *activeListener) OnNewConnection(ctx context.Context, conn api.Connection) { filterManager := conn.FilterManager() for _, nfcf := range al.networkFiltersFactories { nfcf.CreateFilterChain(ctx, filterManager) } filterManager.InitializeReadFilters() if len (filterManager.ListReadFilter()) == 0 && len (filterManager.ListWriteFilters()) == 0 { conn.Close(api.NoFlush, api.LocalClose) return } ac := newActiveConnection(al, conn) al.connsMux.Lock() e := al.conns.PushBack(ac) al.connsMux.Unlock() ac.element = e conn.Start(ctx) }
Let's Play! 这里开始上文的不同IO模型了
Start 1 2 3 4 5 6 7 8 9 func (c *connection) Start(lctx context.Context) { c.startOnce.Do(func () { if UseNetpollMode { c.attachEventLoop(lctx) } else { c.startRWLoop(lctx) } }) }
Read Connection
Read: EventLoop [attachEventLoop]
Read/Write事件监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (c *connection) attachEventLoop(lctx context.Context) { c.eventLoop = attach() err := c.eventLoop.registerRead(c, &connEventHandler{ onRead: func () bool { if c.readEnabled { ➊ err := c.doRead() if err != nil { if te, ok := err.(net.Error); ok && te.Timeout() { if c.readBuffer != nil && c.readBuffer.Len() == 0 { c.readBuffer.Free() c.readBuffer.Alloc(DefaultBufferReadCapacity) } return true } return false } } else { select { case <-c.readEnabledChan: case <-time.After(100 * time.Millisecond): ➋ } } return true }, onHup: func () bool { log.DefaultLogger.Errorf("[network] [event loop] [onHup] ReadHup error. Connection = %d, Remote Address = %s" , c.id, c.RemoteAddr().String()) c.Close(api.NoFlush, api.RemoteClose) return false }, }) }
值得注意的 ➊ ➋ 做了一些 ReadEnable 的控制,应该是对 Transfer FD 部分进行设计,这里定义我们在读取和 Hup 时候的回调,我们看看回调事件是如何注册的。
registerRead 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (el *eventLoop) registerRead(conn *connection, handler *connEventHandler) error { read, err := netpoll.HandleFile(conn.file, netpoll.EventRead|netpoll.EventOneShot) if err != nil { return err } el.poller.Start(read, el.readWrapper(read, handler)) el.mu.Lock() el.conn[conn.id] = &connEvent{ read: read, } el.mu.Unlock() return nil }
核心的就是在 el.poller.Start(read, el.readWrapper(read, handler)) 开始监听我们的执行事件,对 Write写事件也是一样的模式
register 1 2 el.poller.Start(read, el.readWrapper(read, handler)) el.poller.Start(write, el.writeWrapper(write, handler))
不过值得注意是,对一个同一个请求的读写事件的 Poller 肯定是在同一个上面。那我们下面需要看我们最重要的部分,我们真实的读取数据从何而来,不过先小结一下:
小结
对于 Mosn 来说,我们对于 Read/Write 可以使用多个 RawPoller 进行操作,这样可以有效的利用多核的性质,不过这里又和我们后面的 GoRoutine 会冲突,这里需要好好设计你的 poolsize,不过从源码看现阶段仅支持 1 个的读写监听器。
1 2 3 4 5 6 7 8 9 10 var ( readPool = mosnsync.NewWorkerPool(runtime.NumCPU()) writePool = mosnsync.NewWorkerPool(runtime.NumCPU()) rrCounter uint32 poolSize uint32 = 1 eventLoopPool = make ([]*eventLoop, poolSize) errEventAlreadyRegistered = errors.New("event already registered" ) )
在 ➊ 处已经被标记出来的,暂时还没有查找到修改的地方。
读写事件处理
本章在操作之前由于 Master 版本尚未完善 pkg/network/eventloop.go:45 此处需要打开初始化才能够工作
执行事件的循环肯定在我们最熟悉的 epoll.wait() 中,在 netpoll/epoll.go:235 中就不看了,我们直接看数据的读取的过程,在我们回调事件中
pkg/network/eventloop.go:188 1 2 3 4 5 6 readPool.Schedule(func () { if !handler.onRead() { return } el.poller.Resume(desc) })
我们执行了 readPool 的操作,而这个 Schedule 操作
1 2 3 4 5 6 7 func (p *workerPool) Schedule(task func () ) { select { case p.work <- task: case p.sem <- struct {}{}: go p.spawnWorker(task) } }
可以简单的认为是发生了一个任务通知了我们 work 通道,在下面的代码进行处理
spawnWorker 1 2 3 4 5 6 7 8 9 10 11 12 func (p *workerPool) spawnWorker(task func () ) { defer func () { if r := recover (); r != nil { log.DefaultLogger.Alertf("syncpool" , "[syncpool] panic %v\n%s" , p, string (debug.Stack())) } <-p.sem }() for { task() task = <-p.work } }
题外话1:Mosn 的 WorkerPool
笔者之前写Java为主,这次看到这样的工作池设计,一时半会还没看懂,理了一下才行还是有点意思的。
NewWorkerPool 1 2 3 4 5 6 func NewWorkerPool (size int ) WorkerPool { return &workerPool{ work: make (chan func () ), sem: make (chan struct {}, size), } }
Schedule 1 2 3 4 5 6 7 func (p *workerPool) Schedule(task func () ) { select { case p.work <- task: case p.sem <- struct {}{}: go p.spawnWorker(task) } }
spawnWorker 1 2 3 4 5 6 7 8 9 10 11 12 func (p *workerPool) spawnWorker(task func () ) { defer func () { if r := recover (); r != nil { log.DefaultLogger.Alertf("syncpool" , "[syncpool] panic %v\n%s" , p, string (debug.Stack())) } <-p.sem }() for { task() task = <-p.work } }
这里的控制任务池的方法还是很精妙的, 如果请求量不大,处理很快的话不一定需要创建最大Maxium的Worker,不过缺点就是并不会在高峰之后并不会回收。
Read: RWLoop [startRWLoop]
看完了 RawEpoll 的模式,我们看看 RWLoop 的模式
startRWLoop 1 2 3 4 5 6 7 8 9 func (c *connection) startRWLoop(lctx context.Context) { c.internalLoopStarted = true utils.GoWithRecover(func () { c.startReadLoop() }, func (r interface {}) { c.Close(api.NoFlush, api.LocalClose) }) }
在 pkg/network/connection.go:startReadLoop 和上面大同小异进行了一系列的处理,在这里我们就直接创建一个新的 GoRotinue 进行该请求的处理,核心的代码在 pkg/network/connection.go:doRead() 中。在 doRead() 的上半段就是在处理读取数据,从 pkg/network/connection.go:507:
进入了数据的读取阶段。可以发现对于 RWLoop 方式的代码是比较的简单的。
Read Chian
对于不同的 IO 处理部分已经在上文结束,我们进入数据获得的部分:
onRead() 1 2 3 4 5 6 7 8 func (c *connection) onRead(bytesRead int64 ) { for _, cb := range c.bytesReadCallbacks { cb(uint64 (bytesRead)) } c.filterManager.OnRead() c.updateReadBufStats(bytesRead, int64 (c.readBuffer.Len())) }
我们又看到了我们的老熟人 FilterManager,在 Envoy 源码分析 中,我们也发现了 Envoy 是相同的设计模式。我们看看这个 FilterManager 的工作机制
FilterManager
FilterManager 1 2 3 4 5 6 7 8 9 type FilterManager interface { AddReadFilter(rf ReadFilter) AddWriteFilter(wf WriteFilter) ListReadFilter() []ReadFilter ListWriteFilters() []WriteFilter InitializeReadFilters() bool OnRead() OnWrite(buffer []buffer.IoBuffer) FilterStatus }
从 FilterManager 声明看,类似于 Netty 一样的 Inbound Channel 和 Outbound Channel。我们深入 OnRead 部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (fm *filterManager) onContinueReading(filter *activeReadFilter) { var index int var uf *activeReadFilter if filter != nil { index = filter.index + 1 } for ; index < len (fm.upstreamFilters); index++ { uf = fm.upstreamFilters[index] uf.index = index buf := fm.conn.GetReadBuffer() if buf != nil && buf.Len() > 0 { status := uf.filter.OnData(buf) if status == api.Stop { return } } } }
那我们需要处理的逻辑显然都是在 Filter:OnData 部分了。这部分的 Filter 类型,在 yaml 配置可以指定:
1 2 3 4 5 6 7 8 9 10 11 12 "filter_chains": [{ "filters": [ { "type": "proxy" , "config": { "downstream_protocol": "Http1" , "upstream_protocol": "Http1" , "router_config_name" :"client_router" } } ] }]
我们来看看最常用的 ProxyFilter。
ProxyFilter
在 阅读 ProxyFilter 之前,我们先看下 pkg/proxy/proxy.go:InitializeReadFilterCallbacks 这里我们创建了面向下游的 ServerStreamConn
1 2 3 4 5 6 func (p *proxy) InitializeReadFilterCallbacks(cb api.ReadFilterCallbacks) { p.readCallbacks.Connection().AddConnectionEventListener(p.downstreamListener) if p.config.DownstreamProtocol != string (protocol.Auto) { p.serverStreamConn = stream.CreateServerStreamConnection(p.context, types.ProtocolName(p.config.DownstreamProtocol), p.readCallbacks.Connection(), p) } }
还是比较好理解的,我们向下游发送数据总是需要一个单独的连接的。对于 OnData 函数
1 2 3 4 5 6 7 8 func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus { if p.serverStreamConn == nil { } p.serverStreamConn.Dispatch(buf) return api.Stop }
我们直接进入了 ServerSteamConn 的 Dispatch 函数而在其中很有趣,我们仅仅将 bytes 写入了一个 byfChan
1 2 3 4 5 6 7 8 9 10 func (sc *streamConnection) Dispatch(buffer buffer.IoBuffer) { defer func () { }() for buffer.Len() > 0 { sc.bufChan <- buffer <-sc.bufChan } }
那显然我们需要一个地方进行 Read,没错就是下面几行
Read 1 2 3 4 5 6 7 func (conn *streamConnection) Read(p []byte ) (n int , err error ) { data, ok := <-conn.bufChan n = copy (p, data.Bytes()) data.Drain(n) conn.bufChan <- nil return }
但是有一个疑问放在我们面前,是谁启动这个 Read() 的处理者呢?这个问题要回溯到我们的 ServerConnection 创建的时刻
newServerStreamConnection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func newServerStreamConnection (ctx context.Context, connection api.Connection, callbacks types.ServerStreamConnectionEventListener) types.ServerStreamConnection { ssc := &serverStreamConnection{ streamConnection: streamConnection{ context: ctx, conn: connection, bufChan: make (chan buffer.IoBuffer), connClosed: make (chan bool , 1 ), }, contextManager: str.NewContextManager(ctx), serverStreamConnListener: callbacks, } ssc.contextManager.Next() ssc.br = bufio.NewReader(ssc) ssc.bw = bufio.NewWriter(ssc) connection.AddConnectionEventListener(ssc) ssc.conn.SetTransferEventListener(func () bool { ssc.close = true return false }) utils.GoWithRecover(func () { ssc.serve() }, nil ) return ssc }
我们在这里构建所有的变量,最重要的是 callbacks,我们接收到来自外侧的请求处理逻辑,而这个对象本身的处理逻辑都是在 ssc.serve() 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (conn *serverStreamConnection) serve() { for { ctx := conn.contextManager.Get() buffers := httpBuffersByContext(ctx) request := &buffers.serverRequest err := request.ReadLimitBody(conn.br, maxRequestBodySize) id := protocol.GenerateID() s := &buffers.serverStream s.stream = stream{ id: id, ctx: mosnctx.WithValue(ctx, types.ContextKeyStreamID, id), request: request, response: &buffers.serverResponse, } s.connection = conn s.responseDoneChan = make (chan bool , 1 ) s.header = mosnhttp.RequestHeader{&s.request.Header, nil } conn.mutex.Lock() conn.stream = s conn.mutex.Unlock() if atomic.LoadInt32(&s.readDisableCount) <= 0 { s.handleRequest() } select { case <-s.responseDoneChan: case <-conn.connClosed: return } conn.contextManager.Next() } }
看到这里给大家做个小结,我们先理清顺序
1 2 3 +------------+ +-------------+ +--------------+ | upsteam +-------------->+ mosn +--------->+ downsteam | +------------+ +-------------+ +--------------+
对于 Upsteam Conn 根据模型的不同,可能是 RawEpoll,而 ServerSteamConn 是一个被 Pool 化的 Conn,但是对于这个 Conn 本身也有自己的 Read 和 Write 处理函数,是在一个 Gorotine 中,启动的代码是在 ssc.serve() 中。
OnReceive
对于常见的 Http proxy filter(pkg/proxy/proxy.go) 来说,我们处理 Http encoder 之后,进入 OnReceive 部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { s.downstreamReqHeaders = headers s.context = mosnctx.WithValue(s.context, types.ContextKeyDownStreamHeaders, headers) s.downstreamReqDataBuf = data s.downstreamReqTrailers = trailers id := s.ID pool.ScheduleAuto(func () { defer func () { }() phase := types.InitPhase for i := 0 ; i < 10 ; i++ { phase = s.receive(ctx, id, phase) switch phase { } } }) }
这里采用了一个状态机进行处理,。不过值得注意的最多进行 10 此的状态切换就必须进入 End 状态,不过此处其实一个小设计,应该后面的执行步骤刚好就是10次。真实的处理流程在 pkg/proxy/downstream.go:394 中。执行顺序是 DownFilter MatchRoute DownFilterAfterRoute ChooseHost DownFilterAfterChooseHost DownRecvHeader DownRecvData DownRecvTrailer Oneway Retry WaitNotify UpFilter UpRecvHeader UpRecvData UpRecvTrailer End,这里的处理恰好就是 Mosn 的处理核心部分。
Receive Data [Upstream]
对于 receive 部分值得详细的分析一下。对于前面分了很多种类型的阶段处理,大部分的状态会调用如下的代码,这里的代码是后续的拓展点,先不展开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *downStream) runReceiveFilters(p types.Phase, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { for ; s.receiverFiltersIndex < len (s.receiverFilters); s.receiverFiltersIndex++ { f := s.receiverFilters[s.receiverFiltersIndex] if f.p != p { continue } s.context = mosnctx.WithValue(s.context, types.ContextKeyStreamFilterPhase, p) status := f.filter.OnReceive(s.context, headers, data, trailers) switch status { } } s.receiverFiltersIndex = 0 return }
这里的逻辑不啰嗦了,找到匹配的逻辑进行处理即可。而其他的有些状态处理各不相同。
*matchRoute
比如对于 matchRoute 部分
matchRoute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (s *downStream) matchRoute() { headers := s.downstreamReqHeaders if s.proxy.routersWrapper == nil || s.proxy.routersWrapper.GetRouters() == nil { return } routers := s.proxy.routersWrapper.GetRouters() handlerChain := router.CallMakeHandlerChain(s.context, headers, routers, s.proxy.clusterManager) if handlerChain == nil { return } s.snapshot, s.route = handlerChain.DoNextHandler() }
无处不在的 Chain,对于 Route 的匹配我们也需要创建一个 RouteHandlerChain 进行匹配。对于单个的 RouteHandler 里面来说最核心的就是包含的属性 Route,分为 PathRoute PrefixRoute RegexRoute SofaRoute,具体的逻辑就在内部不展开说了。
在接收到数据处理过程中 DownRecvHeader 进行了数据的发送。发送数据也是蛮有趣的设计:
appendHeaders 1 2 3 4 5 6 7 8 9 10 11 func (r *upstreamRequest) appendHeaders(endStream bool ) { if r.downStream.processDone() { return } r.sendComplete = endStream if r.downStream.oneway { r.connPool.NewStream(r.downStream.context, nil , r) } else { r.connPool.NewStream(r.downStream.context, r, r) } }
这里其实并没直接发送,而是从我们的 ConnPool 中获得一个对下游的 Stream 我们将这个 r 对象直接作为了构建这个下游请求连接的参数。而真正的写入在
endStream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *clientStream) endStream() { err := s.doSend() if err != nil { log.Proxy.Errorf(s.stream.ctx, "[stream] [http] send client request error: %+v" , err) if err == types.ErrConnectionHasClosed { s.ResetStream(types.StreamConnectionFailed) } else { s.ResetStream(types.StreamLocalReset) } return } if log.Proxy.GetLogLevel() >= log.DEBUG { log.Proxy.Debugf(s.stream.ctx, "[stream] [http] send client request, requestId = %v" , s.stream.id) } s.connection.requestSent <- true }
我们将请求 Send 出去将 requestSent 标记置为 true 即可。而对于读取数据,就在下一个阶段 DownRecvData
Receive Data [Downstream]
DownRecvData
DownRecvHeader 先去获取了 DownSteam 的 Header 部分的数据,不过值得注意的如果是 Get 并不会走这里。在 receiveHeaders 中,我们首先调用了
1 2 3 4 5 6 func (s *downStream) receiveData(endStream bool ) { data := s.downstreamReqDataBuf s.requestInfo.SetBytesReceived(s.requestInfo.BytesReceived() + uint64 (data.Len())) s.downstreamRecvDone = endStream s.upstreamRequest.appendData(endStream) }
我在最开始标记我们需要获得多少数据在 SetBytesReceived(s.requestInfo.BytesReceived() + uint64(data.Len())),然后将这些数据放到了发送的队列中。
上面2步走完了发送的流程,而接受的流程从 UpRecvHeader 开始,我们一开始接收到的 Downstream 的 HttpHeader
1 2 3 4 5 6 7 func (s *downStream) appendHeaders(endStream bool ) { s.upstreamProcessDone = endStream headers := s.convertHeader(s.downstreamRespHeaders) if err := s.responseSender.AppendHeaders(s.context, headers, endStream); err != nil { log.Proxy.Errorf(s.context, "append headers error: %s" , err) } }
然后读取数据就在
1 2 3 4 5 6 7 8 9 10 11 func (s *downStream) appendData(endStream bool ) { s.upstreamProcessDone = endStream data := s.convertData(s.downstreamRespDataBuf) s.requestInfo.SetBytesSent(s.requestInfo.BytesSent() + uint64 (data.Len())) s.responseSender.AppendData(s.context, data, endStream) if endStream { s.endStream() } }
说到这里,我们一定要好好分析下 这几个 appendData 是怎么工作的,其实看起来还是挺绕的。
小结2: Upstream 和 DownStream 的交互
对于整个处理流程的核心部分,我们就一直在和 upstream 和 downstream 这两个 package 的对象进行交互。
大概有这么几个函数很重要:
OnReceive:这是接收到数据时候的方法
receiveHeaders:接收到 http header 部分
receiveData: 接收到 http body 部分
这里面有一个很重要的抽象是 pkg/types/stream.go:StreamSender
1 2 3 4 5 6 7 type StreamSender interface { AppendHeaders(ctx context.Context, headers api.HeaderMap, endStream bool ) error AppendData(ctx context.Context, data buffer.IoBuffer, endStream bool ) error AppendTrailers(ctx context.Context, trailers api.HeaderMap) error GetStream() Stream }
整个流程还是在 Downstream:OnReceive 进行控制,逻辑也就分为 将输入的Header/输入的Body 写入 DownStream,将输出按照同样的方式执行一次。不过值得注意的是,和架构图上一致。在处理单一请求的过程中,写入和回读都是同一个 goroutinue,对于 Downstream 的 io 处理是遵守标准的 NetPoller 的模型的,并没有 RawPoller 机制,RawPoller 机制仅在 接受侧 有单独的定义。
小结:all in one
对于 Go Lanague 因为 Goroutine 使用起来很方便,分析 Go 项目很难分析 Theadmodel 这块内容,或者也可以认为开 Goroutine 本身的成本很低,大家不用过于担心切换的成本。
作为对比,下面也放上官方的一张图。
直到这里我们已经将 监听请求 接受数据 转发请求 响应请求 这几个部分都阅读过了,整个流程已经走通了,我们再去看看其他重要的功能。
多协议机制
Mosn 支持多协议之间的转换,我们从想问的分析中,我们可以知道所谓的 Proxy 就是我们整个 FilterManager 中的一个 Filter。因此对于配置文件
1 2 3 4 5 6 7 8 { "type" : "proxy" , "config" : { "downstream_protocol" : "Http1" , "upstream_protocol" : "Http1" , "router_config_name" : "server_router" } }
Mosn 这个功能非常的有趣,因为实际上因为RPC协议非常的多,因此想要支持多种协议的话,都是需要用户自行进行拓展的,不过 Mosn 帮大家做了一部分的工作,流程如下图:
在 pkg/protocol/xprotocol/example/encoder.go 中有一个例子。对于大多数的 RPC 协议我们都是需要获取到 足够 信息之后再处理,那第一步显然是我们需要先获得一些数据也就是 MinimalDecodeLen
Decode 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (proto *proto) Decode(ctx context.Context, data types.IoBuffer) (interface {}, error ) { if data.Len() >= MinimalDecodeLen { magic := data.Bytes()[0 ] dir := data.Bytes()[2 ] switch dir { case DirRequest: return decodeRequest(ctx, data) case DirResponse: return decodeResponse(ctx, data) } } return nil , nil }
之后我们需要获得完整的数据
decodeRequest 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func decodeRequest (ctx context.Context, data types.IoBuffer) (cmd interface {}, err error ) { bytesLen := data.Len() bytes := data.Bytes() payloadLen := binary.BigEndian.Uint32(bytes[7 :]) frameLen := RequestHeaderLen + int (payloadLen) if bytesLen < frameLen { return } data.Drain(frameLen) return request, err }
这样就足够满足我们对于自定义数据的处理能力。而对于协议的解析部分还有一个困难的问题需要我们继续处理也就是 多路复用,我们不能在一个 TCP 链路上仅仅处理一个请求,这就是最低效的 Http1,因此协议需要能够定义出来,如果我在一个链路上进行多次请求是怎么区分的。对于 Mosn 来说这里还是有一个很精妙的设计,利用上下的隐性属性作为唯一的 ID 进行判断。这里就不展开,可以查看官方的 协议扩展框架
我们来关注下对于流程线上的问题,就是这段逻辑是镶嵌在我们的流程的何处的。在特殊的协议处理部分我们在启动时候定义的数据格式都类似于
1 2 3 4 5 6 7 8 9 10 11 12 13 "filters" : [ { "type" : "proxy" , "config" : { "downstream_protocol" : "X" , "upstream_protocol" : "X" , "router_config_name" : "server_router" , "extend_config" : { "sub_protocol" : "dubbo" } } } ]
因此我们在处理特定协议的时候需要传入指定的扩展协议如 dubbo 等。因此在构建的 proxy 的时候,将这个保存在 proxy 的属性中。
proxy:NewProxy 1 2 3 4 if json.Unmarshal([]byte (extJSON), &xProxyExtendConfig); xProxyExtendConfig.SubProtocol != "" { proxy.context = mosnctx.WithValue(proxy.context, types.ContextSubProtocol, xProxyExtendConfig.SubProtocol) log.DefaultLogger.Tracef("[proxy] extend config subprotocol = %v" , xProxyExtendConfig.SubProtocol) }
而在我们收到数据的时候会发现在 OnData 的时候我们会创建 ServerStream 的时候传入协议属性
proxy:OnData 1 2 3 4 5 6 7 8 9 10 func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus { if p.serverStreamConn == nil { protocol, err := stream.SelectStreamFactoryProtocol(p.context, prot, buf.Bytes()) p.serverStreamConn = stream.CreateServerStreamConnection(p.context, protocol, p.readCallbacks.Connection(), p) } p.serverStreamConn.Dispatch(buf) return api.Stop }
因此如果我们使用扩展的 协议 这里创建的 ServerStreamConn 也就是 pkg/stream/xprotocol/conn.go,而他的处理逻辑如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (sc *streamConn) Dispatch(buf types.IoBuffer) { for { streamCtx := sc.ctxManager.Get() frame, err := sc.protocol.Decode(streamCtx, buf) if frame == nil && err == nil { return } if frame != nil { xframe, ok := frame.(xprotocol.XFrame) if !ok { log.Proxy.Errorf(sc.ctx, "[stream] [xprotocol] conn %d, %v frame type not match : %T" , sc.netConn.ID(), sc.netConn.RemoteAddr(), frame) return } sc.handleFrame(streamCtx, xframe) } sc.ctxManager.Next() } }
因此这段逻辑其实还是穿插在我们的 Proxy 中作为 Filter 中的一环,而这一环有需要依赖构建出不同的 StreamConn 进行处理,现在已知的支持 http1 http2 xprotocol 这3个类型。
插件机制
对于 Mosn 的插件机制也相对复杂,对于 Envoy 来说我们只有2处可以进行插件埋点,Mosn 也相对一致。
SteamFilter
MOSN 扩展中使用频率最高的扩展点。对于配置项如下
1 2 3 4 5 6 7 8 9 10 11 12 13 "listeners" : [ { "filter_chains" : [ { } ] , "stream_filters" : [ { "type" : "demo" , "config" : { "User" : "admin" } } ] } ]
从配置看,我可以发现 StreamFilter 和 FilterChain 是平级的(不过实际上并不是)。因此 Stream 分了两种类型:
进入 proxy 流程以后,如果存在 ReceiverFilter,那么就会执行对应的逻辑,ReceiverFilter 包括两个阶段,“路由前”和“路由后”,在每个 Filter 处理完成以后,会返回一个状态,如果是 Stop 则会中止后续尚未执行的 ReceiverFilter,通常情况下,返回 Stop 状态的 Filter 都会回写一个响应。如果是 Continue 则会执行下一个 ReceiverFilter,直到本阶段的 ReceiverFilter 都执行完成或中止;路由前阶段的 ReceiverFIlter 执行完成后,就会执行路由后阶段,其逻辑和路由前一致。如果是正常转发,那么随后 MOSN 会收到一个响应或者发现其他异常直接回写一个响应,此时就会进入到 SenderFilter 的流程中,完成 SenderFilter 的处理。SenderFilter 处理完成以后,MOSN 会写响应给 Client,并且完成最后的收尾工作,收尾工作包括一些数据的回收、日志的记录,以及 StreamFilter 的“销毁”(调用 OnDestroy)。
原文中有几个重点,在系统解析完一个 请求 之后才会进入这个流程,因此在 pkg/stream/http/stream.go:serve 中可以发现构建这个 StreamFilter 的逻辑
pkg/proxy/downstream.go 1 2 3 4 func (s *downStream) AddStreamSenderFilter(filter api.StreamSenderFilter) { sf := newActiveStreamSenderFilter(s, filter) s.senderFilters = append (s.senderFilters, sf) }
回到我们熟悉的地方来 DownStream:receive 中
receive 1 2 3 4 5 6 7 8 9 10 11 12 case types.DownFilterAfterChooseHost: if log.Proxy.GetLogLevel() >= log.DEBUG { s.printPhaseInfo(types.DownFilterAfterChooseHost, id) } s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers) if p, err := s.processError(id); err != nil { return p } phase++
官方的Demo可以看一下,使用起来还是很方便的 Stream Filter Demo
MOSN Plugin 机制
Mosn 还提供了另外一种基于通讯机制的方式
在这里有个例子:Stream Filter Plugin demo
不过这种算是上面一种的变形,实际上我们依然需要将 StreamFilter 注册到系统中,不过这一次所有的逻辑处理不在编译器固定,而是有一个 gRPC 的外部服务可以进行通讯判断。因此其实不再限于单一语言,我们只需要遵守 gRPC 的协议即可。
平滑升级
平滑升级可以参考 聊聊服务端热更新 和 Nginx vs Envoy vs MOSN 平滑升级原理解析 不做展开了。
参考