本文基于 0.15.0
版本,因为 Mosn
代码非常的巨大,对于 *
开头的章节作为补充部分,可以跳过。
MOSN 的 IO 模型 Mosn
支持两种网络模型 Netpoll
/ RawEpoll
Netpoll 模式 Netpoll 模式是标准的 Go
的网络模型, goroutine-per-connection
相对缺点就是消耗高点,但是编程会简单很多。
RawEpoll 模式
链接建立后,向 Epoll 注册 oneshot
可读事件监听;并且此时不允许有协程调用 conn.read
,避免与 runtime netpoll
冲突。
可读事件到达,从 goroutine pool
挑选一个协程进行读事件处理;由于使用的是 oneshot
模式,该 fd 后续可读事件不会再触发。
请求处理过程中,协程调度与经典 netpoll
模式一致。
请求处理完成,将协程归还给协程池;同时将 fd
重新添加到 RawEpoll
中。
How it work 大家先瞧一眼模块图,瞧瞧就好了。
Start Server 代码的入口在 pkg/mosn/starter.go:300
,我们采用文件的方式进行启动。
我们看到熟悉的对于 Listener
的构建,对于 xDS
协议来说, Listener
部分决定了对外的监听部分。
1 2 3 4 5 6 7 for idx, _ := range serverConfig.Listeners { lc := configmanager.ParseListenerConfig(&serverConfig.Listeners[idx], inheritListeners, inheritPacketConn) deprecatedRouter, err := configmanager.ParseRouterConfiguration(&lc.FilterChains[0 ]) if _, err := srv.AddListener(lc); err != nil { log.StartLogger.Fatalf("[mosn] [NewMosn] AddListener error:%s" , err.Error()) } }
对于熟悉 Linux
编程的同学, Listener
势必是创建一个 ServerSocket
进行监听。对于处理的逻辑,应该也是在构建的过程中创建出来的,我们跳至创建处:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (ch *connHandler) AddOrUpdateListener(lc *v2.Listener) (types.ListenerEventListener, error ) { var listenerName string if lc.Name == "" { listenerName = utils.GenerateUUID() lc.Name = listenerName } else { listenerName = lc.Name } if len (lc.FilterChains) != 1 { return nil , errors.New("error updating listener, listener have filter chains count is not 1" ) } var listenerFiltersFactories []api.ListenerFilterChainFactory var networkFiltersFactories []api.NetworkFilterChainFactory var streamFiltersFactories []api.StreamFilterChainFactory listenerFiltersFactories = configmanager.GetListenerFilters(lc.ListenerFilters) networkFiltersFactories = configmanager.GetNetworkFilters(&lc.FilterChains[0 ]) streamFiltersFactories = configmanager.GetStreamFilters(lc.StreamFilters) var al *activeListener if al = ch.findActiveListenerByName(listenerName); al != nil { } else { listenerStopChan := make (chan struct {}) ➊ l := network.NewListener(lc) var err error al, err = newActiveListener(l, lc, als, listenerFiltersFactories, networkFiltersFactories, streamFiltersFactories, ch, listenerStopChan) ➋ if err != nil { return al, err } l.SetListenerCallbacks(al) ch.listeners = append (ch.listeners, al) log.DefaultLogger.Infof("[server] [conn handler] [add listener] add listener: %s" , lc.Addr.String()) } admin.SetListenerConfig(listenerName, *al.listener.Config()) return al, nil }
这部分的代码显然不能满足我们对于核心,怎么 AC
请求的需求,这里是简单的创建了 Listener
,让我们继续跟踪下去,不过值得注意的在 ➊ 创建了一个 StopChain
,这是非常标准 Go
中如何通知他人 Close
的方式。不过从 ➋ 处,我们可以看到一些熟悉的 Filters
的单字,想必也是通过 FilterChains
进行工作的。
在构建的 newActiveListener
进行了一些监控的初始化。真正进行工作的代码是在 pkg/server/handler.go:StartListener
Start 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (l *listener) Start(lctx context.Context, restart bool ) { if l.bindToPort { ignore := func () bool { default : if l.rawl == nil && l.packetConn == nil { if err := l.listen(lctx); err != nil { log.StartLogger.Fatalf("[network] [listener start] [listen] %s listen failed, %v" , l.name, err) } } } l.state = ListenerRunning return false }() switch l.network { case "udp" : l.readMsgEventLoop(lctx) default : l.acceptEventLoop(lctx) } } }
在 Start
有两个重点 listen(lctx)
创建了监听器,在 l.acceptEventLoop(lctx)
进行接受的事件循环,listen(lctx)
的逻辑就比较简单就是标准的 Go Net
,我们进入下个函数在 l.acceptEventLoop(lctx)
中
1 2 3 4 5 6 7 8 9 10 11 12 func (l *listener) acceptEventLoop(lctx context.Context) { for { if err := l.accept(lctx); err != nil { if nerr, ok := err.(net.Error); ok && nerr.Timeout() { log.DefaultLogger.Infof("[network] [listener start] [accept] listener %s stop accepting connections by deadline" , l.name) return } else if ope, ok := err.(*net.OpError); ok { } } } }
Accept Connection 直接只有 loop
循环,不断的去接受即可,标准的 Networking
编程。
accept 1 2 3 4 5 6 7 8 9 func (l *listener) accept(lctx context.Context) error { rawc, err := l.rawl.Accept() utils.GoWithRecover(func () { l.cb.OnAccept(rawc, l.useOriginalDst, nil , nil , nil ) }, nil ) return nil }
在 OnAccept
中就是我们线程模型的开始
OnAccept 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (al *activeListener) OnAccept(rawc net.Conn, useOriginalDst bool , oriRemoteAddr net.Addr, ch chan api.Connection, buf []byte ) { var rawf *os.File if !useOriginalDst { if network.UseNetpollMode { ➊ switch rawc.LocalAddr().Network() { case "udp" : if tc, ok := rawc.(*net.UDPConn); ok { rawf, _ = tc.File() } default : if tc, ok := rawc.(*net.TCPConn); ok { rawf, _ = tc.File() } } } } arc := newActiveRawConn(rawc, al) for _, lfcf := range al.listenerFiltersFactories { arc.acceptedFilters = append (arc.acceptedFilters, lfcf) ➋ } ctx := mosnctx.WithValue(context.Background(), types.ContextKeyListenerPort, al.listenPort) arc.ctx = ctx arc.ContinueFilterChain(ctx, true ) }
在 ➊ 处,我们就开始区分我们的 RawEpoll
和 NetPoll
的模式了。最重要的部分就是 ➋ 处理进行了 Listener
的 FilterChain
构建,我们继续往下探索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (arc *activeRawConn) ContinueFilterChain(ctx context.Context, success bool ) { if !success { return } for ; arc.acceptedFilterIndex < len (arc.acceptedFilters); arc.acceptedFilterIndex++ { filterStatus := arc.acceptedFilters[arc.acceptedFilterIndex].OnAccept(arc) ➊ if filterStatus == api.Stop { return } } arc.activeListener.newConnection(ctx, arc.rawc) }
从 ➊ 处,我们就发现了 FliterChain
从返回的对象也可以清晰的发现,这和 Servlet Filter
不同,每个 Filter
返回对象来决定是否需要继续下去。
在 newConnection
在中 处理下各种超时的设置,我们勇敢的往前进,后续的 OnNewConnection
有我们的大头戏了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (al *activeListener) OnNewConnection(ctx context.Context, conn api.Connection) { filterManager := conn.FilterManager() for _, nfcf := range al.networkFiltersFactories { nfcf.CreateFilterChain(ctx, filterManager) } filterManager.InitializeReadFilters() if len (filterManager.ListReadFilter()) == 0 && len (filterManager.ListWriteFilters()) == 0 { conn.Close(api.NoFlush, api.LocalClose) return } ac := newActiveConnection(al, conn) al.connsMux.Lock() e := al.conns.PushBack(ac) al.connsMux.Unlock() ac.element = e conn.Start(ctx) }
Let's Play!
这里开始上文的不同IO模型了
Start 1 2 3 4 5 6 7 8 9 func (c *connection) Start(lctx context.Context) { c.startOnce.Do(func () { if UseNetpollMode { c.attachEventLoop(lctx) } else { c.startRWLoop(lctx) } }) }
Read Connection Read: EventLoop [attachEventLoop] Read/Write事件监听 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (c *connection) attachEventLoop(lctx context.Context) { c.eventLoop = attach() err := c.eventLoop.registerRead(c, &connEventHandler{ onRead: func () bool { if c.readEnabled { ➊ err := c.doRead() if err != nil { if te, ok := err.(net.Error); ok && te.Timeout() { if c.readBuffer != nil && c.readBuffer.Len() == 0 { c.readBuffer.Free() c.readBuffer.Alloc(DefaultBufferReadCapacity) } return true } return false } } else { select { case <-c.readEnabledChan: case <-time.After(100 * time.Millisecond): ➋ } } return true }, onHup: func () bool { log.DefaultLogger.Errorf("[network] [event loop] [onHup] ReadHup error. Connection = %d, Remote Address = %s" , c.id, c.RemoteAddr().String()) c.Close(api.NoFlush, api.RemoteClose) return false }, }) }
值得注意的 ➊ ➋ 做了一些 ReadEnable
的控制,应该是对 Transfer FD
部分进行设计,这里定义我们在读取和 Hup
时候的回调,我们看看回调事件是如何注册的。
registerRead 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (el *eventLoop) registerRead(conn *connection, handler *connEventHandler) error { read, err := netpoll.HandleFile(conn.file, netpoll.EventRead|netpoll.EventOneShot) if err != nil { return err } el.poller.Start(read, el.readWrapper(read, handler)) el.mu.Lock() el.conn[conn.id] = &connEvent{ read: read, } el.mu.Unlock() return nil }
核心的就是在 el.poller.Start(read, el.readWrapper(read, handler))
开始监听我们的执行事件,对 Write
写事件也是一样的模式
register 1 2 el.poller.Start(read, el.readWrapper(read, handler)) el.poller.Start(write, el.writeWrapper(write, handler))
不过值得注意是,对一个同一个请求的读写事件的 Poller
肯定是在同一个上面。那我们下面需要看我们最重要的部分,我们真实的读取数据从何而来,不过先小结一下:
小结
对于 Mosn
来说,我们对于 Read/Write
可以使用多个 RawPoller
进行操作,这样可以有效的利用多核的性质,不过这里又和我们后面的 GoRoutine
会冲突,这里需要好好设计你的 poolsize
,不过从源码看现阶段仅支持 1
个的读写监听器。
1 2 3 4 5 6 7 8 9 10 var ( readPool = mosnsync.NewWorkerPool(runtime.NumCPU()) writePool = mosnsync.NewWorkerPool(runtime.NumCPU()) rrCounter uint32 poolSize uint32 = 1 eventLoopPool = make ([]*eventLoop, poolSize) errEventAlreadyRegistered = errors.New("event already registered" ) )
在 ➊ 处已经被标记出来的,暂时还没有查找到修改的地方。
读写事件处理 本章在操作之前由于 Master
版本尚未完善 pkg/network/eventloop.go:45
此处需要打开初始化才能够工作
执行事件的循环肯定在我们最熟悉的 epoll.wait()
中,在 netpoll/epoll.go:235
中就不看了,我们直接看数据的读取的过程,在我们回调事件中
pkg/network/eventloop.go:188 1 2 3 4 5 6 readPool.Schedule(func () { if !handler.onRead() { return } el.poller.Resume(desc) })
我们执行了 readPool
的操作,而这个 Schedule
操作
1 2 3 4 5 6 7 func (p *workerPool) Schedule(task func () ) { select { case p.work <- task: case p.sem <- struct {}{}: go p.spawnWorker(task) } }
可以简单的认为是发生了一个任务通知了我们 work
通道,在下面的代码进行处理
spawnWorker 1 2 3 4 5 6 7 8 9 10 11 12 func (p *workerPool) spawnWorker(task func () ) { defer func () { if r := recover (); r != nil { log.DefaultLogger.Alertf("syncpool" , "[syncpool] panic %v\n%s" , p, string (debug.Stack())) } <-p.sem }() for { task() task = <-p.work } }
题外话1:Mosn 的 WorkerPool 笔者之前写Java
为主,这次看到这样的工作池设计,一时半会还没看懂,理了一下才行还是有点意思的。
NewWorkerPool 1 2 3 4 5 6 func NewWorkerPool (size int ) WorkerPool { return &workerPool{ work: make (chan func () ), sem: make (chan struct {}, size), } }
Schedule 1 2 3 4 5 6 7 func (p *workerPool) Schedule(task func () ) { select { case p.work <- task: case p.sem <- struct {}{}: go p.spawnWorker(task) } }
spawnWorker 1 2 3 4 5 6 7 8 9 10 11 12 func (p *workerPool) spawnWorker(task func () ) { defer func () { if r := recover (); r != nil { log.DefaultLogger.Alertf("syncpool" , "[syncpool] panic %v\n%s" , p, string (debug.Stack())) } <-p.sem }() for { task() task = <-p.work } }
这里的控制任务池的方法还是很精妙的, 如果请求量不大,处理很快的话不一定需要创建最大Maxium
的Worker
,不过缺点就是并不会在高峰之后并不会回收。
Read: RWLoop [startRWLoop] 看完了 RawEpoll
的模式,我们看看 RWLoop
的模式
startRWLoop 1 2 3 4 5 6 7 8 9 func (c *connection) startRWLoop(lctx context.Context) { c.internalLoopStarted = true utils.GoWithRecover(func () { c.startReadLoop() }, func (r interface {}) { c.Close(api.NoFlush, api.LocalClose) }) }
在 pkg/network/connection.go:startReadLoop
和上面大同小异进行了一系列的处理,在这里我们就直接创建一个新的 GoRotinue
进行该请求的处理,核心的代码在 pkg/network/connection.go:doRead()
中。在 doRead()
的上半段就是在处理读取数据,从 pkg/network/connection.go:507:
进入了数据的读取阶段。可以发现对于 RWLoop
方式的代码是比较的简单的。
Read Chian 对于不同的 IO
处理部分已经在上文结束,我们进入数据获得的部分:
onRead() 1 2 3 4 5 6 7 8 func (c *connection) onRead(bytesRead int64 ) { for _, cb := range c.bytesReadCallbacks { cb(uint64 (bytesRead)) } c.filterManager.OnRead() c.updateReadBufStats(bytesRead, int64 (c.readBuffer.Len())) }
我们又看到了我们的老熟人 FilterManager
,在 Envoy 源码分析
中,我们也发现了 Envoy
是相同的设计模式。我们看看这个 FilterManager
的工作机制
FilterManager FilterManager 1 2 3 4 5 6 7 8 9 type FilterManager interface { AddReadFilter(rf ReadFilter) AddWriteFilter(wf WriteFilter) ListReadFilter() []ReadFilter ListWriteFilters() []WriteFilter InitializeReadFilters() bool OnRead() OnWrite(buffer []buffer.IoBuffer) FilterStatus }
从 FilterManager
声明看,类似于 Netty
一样的 Inbound Channel
和 Outbound Channel
。我们深入 OnRead
部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (fm *filterManager) onContinueReading(filter *activeReadFilter) { var index int var uf *activeReadFilter if filter != nil { index = filter.index + 1 } for ; index < len (fm.upstreamFilters); index++ { uf = fm.upstreamFilters[index] uf.index = index buf := fm.conn.GetReadBuffer() if buf != nil && buf.Len() > 0 { status := uf.filter.OnData(buf) if status == api.Stop { return } } } }
那我们需要处理的逻辑显然都是在 Filter:OnData
部分了。这部分的 Filter
类型,在 yaml
配置可以指定:
1 2 3 4 5 6 7 8 9 10 11 12 "filter_chains": [{ "filters": [ { "type": "proxy" , "config": { "downstream_protocol": "Http1" , "upstream_protocol": "Http1" , "router_config_name" :"client_router" } } ] }]
我们来看看最常用的 ProxyFilter
。
ProxyFilter 在 阅读 ProxyFilter
之前,我们先看下 pkg/proxy/proxy.go:InitializeReadFilterCallbacks
这里我们创建了面向下游的 ServerStreamConn
1 2 3 4 5 6 func (p *proxy) InitializeReadFilterCallbacks(cb api.ReadFilterCallbacks) { p.readCallbacks.Connection().AddConnectionEventListener(p.downstreamListener) if p.config.DownstreamProtocol != string (protocol.Auto) { p.serverStreamConn = stream.CreateServerStreamConnection(p.context, types.ProtocolName(p.config.DownstreamProtocol), p.readCallbacks.Connection(), p) } }
还是比较好理解的,我们向下游发送数据总是需要一个单独的连接的。对于 OnData
函数
1 2 3 4 5 6 7 8 func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus { if p.serverStreamConn == nil { } p.serverStreamConn.Dispatch(buf) return api.Stop }
我们直接进入了 ServerSteamConn
的 Dispatch
函数而在其中很有趣,我们仅仅将 bytes
写入了一个 byfChan
1 2 3 4 5 6 7 8 9 10 func (sc *streamConnection) Dispatch(buffer buffer.IoBuffer) { defer func () { }() for buffer.Len() > 0 { sc.bufChan <- buffer <-sc.bufChan } }
那显然我们需要一个地方进行 Read
,没错就是下面几行
Read 1 2 3 4 5 6 7 func (conn *streamConnection) Read(p []byte ) (n int , err error ) { data, ok := <-conn.bufChan n = copy (p, data.Bytes()) data.Drain(n) conn.bufChan <- nil return }
但是有一个疑问放在我们面前,是谁启动这个 Read()
的处理者呢?这个问题要回溯到我们的 ServerConnection
创建的时刻
newServerStreamConnection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func newServerStreamConnection (ctx context.Context, connection api.Connection, callbacks types.ServerStreamConnectionEventListener) types.ServerStreamConnection { ssc := &serverStreamConnection{ streamConnection: streamConnection{ context: ctx, conn: connection, bufChan: make (chan buffer.IoBuffer), connClosed: make (chan bool , 1 ), }, contextManager: str.NewContextManager(ctx), serverStreamConnListener: callbacks, } ssc.contextManager.Next() ssc.br = bufio.NewReader(ssc) ssc.bw = bufio.NewWriter(ssc) connection.AddConnectionEventListener(ssc) ssc.conn.SetTransferEventListener(func () bool { ssc.close = true return false }) utils.GoWithRecover(func () { ssc.serve() }, nil ) return ssc }
我们在这里构建所有的变量,最重要的是 callbacks
,我们接收到来自外侧的请求处理逻辑,而这个对象本身的处理逻辑都是在 ssc.serve()
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func (conn *serverStreamConnection) serve() { for { ctx := conn.contextManager.Get() buffers := httpBuffersByContext(ctx) request := &buffers.serverRequest err := request.ReadLimitBody(conn.br, maxRequestBodySize) id := protocol.GenerateID() s := &buffers.serverStream s.stream = stream{ id: id, ctx: mosnctx.WithValue(ctx, types.ContextKeyStreamID, id), request: request, response: &buffers.serverResponse, } s.connection = conn s.responseDoneChan = make (chan bool , 1 ) s.header = mosnhttp.RequestHeader{&s.request.Header, nil } conn.mutex.Lock() conn.stream = s conn.mutex.Unlock() if atomic.LoadInt32(&s.readDisableCount) <= 0 { s.handleRequest() } select { case <-s.responseDoneChan: case <-conn.connClosed: return } conn.contextManager.Next() } }
看到这里给大家做个小结,我们先理清顺序
1 2 3 +------------+ +-------------+ +--------------+ | upsteam +-------------->+ mosn +--------->+ downsteam | +------------+ +-------------+ +--------------+
对于 Upsteam Conn
根据模型的不同,可能是 RawEpoll
,而 ServerSteamConn
是一个被 Pool
化的 Conn
,但是对于这个 Conn
本身也有自己的 Read
和 Write
处理函数,是在一个 Gorotine
中,启动的代码是在 ssc.serve()
中。
OnReceive 对于常见的 Http proxy filter(pkg/proxy/proxy.go)
来说,我们处理 Http encoder
之后,进入 OnReceive
部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { s.downstreamReqHeaders = headers s.context = mosnctx.WithValue(s.context, types.ContextKeyDownStreamHeaders, headers) s.downstreamReqDataBuf = data s.downstreamReqTrailers = trailers id := s.ID pool.ScheduleAuto(func () { defer func () { }() phase := types.InitPhase for i := 0 ; i < 10 ; i++ { phase = s.receive(ctx, id, phase) switch phase { } } }) }
这里采用了一个状态机进行处理,。不过值得注意的最多进行 10
此的状态切换就必须进入 End
状态,不过此处其实一个小设计,应该后面的执行步骤刚好就是10次。真实的处理流程在 pkg/proxy/downstream.go:394
中。执行顺序是 DownFilter
MatchRoute
DownFilterAfterRoute
ChooseHost
DownFilterAfterChooseHost
DownRecvHeader
DownRecvData
DownRecvTrailer
Oneway
Retry
WaitNotify
UpFilter
UpRecvHeader
UpRecvData
UpRecvTrailer
End
,这里的处理恰好就是 Mosn
的处理核心部分。
Receive Data [Upstream] 对于 receive
部分值得详细的分析一下。对于前面分了很多种类型的阶段处理,大部分的状态会调用如下的代码,这里的代码是后续的拓展点,先不展开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *downStream) runReceiveFilters(p types.Phase, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) { for ; s.receiverFiltersIndex < len (s.receiverFilters); s.receiverFiltersIndex++ { f := s.receiverFilters[s.receiverFiltersIndex] if f.p != p { continue } s.context = mosnctx.WithValue(s.context, types.ContextKeyStreamFilterPhase, p) status := f.filter.OnReceive(s.context, headers, data, trailers) switch status { } } s.receiverFiltersIndex = 0 return }
这里的逻辑不啰嗦了,找到匹配的逻辑进行处理即可。而其他的有些状态处理各不相同。
*matchRoute 比如对于 matchRoute
部分
matchRoute 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (s *downStream) matchRoute() { headers := s.downstreamReqHeaders if s.proxy.routersWrapper == nil || s.proxy.routersWrapper.GetRouters() == nil { return } routers := s.proxy.routersWrapper.GetRouters() handlerChain := router.CallMakeHandlerChain(s.context, headers, routers, s.proxy.clusterManager) if handlerChain == nil { return } s.snapshot, s.route = handlerChain.DoNextHandler() }
无处不在的 Chain
,对于 Route
的匹配我们也需要创建一个 RouteHandlerChain
进行匹配。对于单个的 RouteHandler
里面来说最核心的就是包含的属性 Route
,分为 PathRoute
PrefixRoute
RegexRoute
SofaRoute
,具体的逻辑就在内部不展开说了。
在接收到数据处理过程中 DownRecvHeader
进行了数据的发送。发送数据也是蛮有趣的设计:
appendHeaders 1 2 3 4 5 6 7 8 9 10 11 func (r *upstreamRequest) appendHeaders(endStream bool ) { if r.downStream.processDone() { return } r.sendComplete = endStream if r.downStream.oneway { r.connPool.NewStream(r.downStream.context, nil , r) } else { r.connPool.NewStream(r.downStream.context, r, r) } }
这里其实并没直接发送,而是从我们的 ConnPool
中获得一个对下游的 Stream
我们将这个 r
对象直接作为了构建这个下游请求连接的参数。而真正的写入在
endStream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (s *clientStream) endStream() { err := s.doSend() if err != nil { log.Proxy.Errorf(s.stream.ctx, "[stream] [http] send client request error: %+v" , err) if err == types.ErrConnectionHasClosed { s.ResetStream(types.StreamConnectionFailed) } else { s.ResetStream(types.StreamLocalReset) } return } if log.Proxy.GetLogLevel() >= log.DEBUG { log.Proxy.Debugf(s.stream.ctx, "[stream] [http] send client request, requestId = %v" , s.stream.id) } s.connection.requestSent <- true }
我们将请求 Send
出去将 requestSent
标记置为 true
即可。而对于读取数据,就在下一个阶段 DownRecvData
Receive Data [Downstream] DownRecvData DownRecvHeader
先去获取了 DownSteam
的 Header
部分的数据,不过值得注意的如果是 Get
并不会走这里。在 receiveHeaders
中,我们首先调用了
1 2 3 4 5 6 func (s *downStream) receiveData(endStream bool ) { data := s.downstreamReqDataBuf s.requestInfo.SetBytesReceived(s.requestInfo.BytesReceived() + uint64 (data.Len())) s.downstreamRecvDone = endStream s.upstreamRequest.appendData(endStream) }
我在最开始标记我们需要获得多少数据在 SetBytesReceived(s.requestInfo.BytesReceived() + uint64(data.Len()))
,然后将这些数据放到了发送的队列中。
上面2步走完了发送的流程,而接受的流程从 UpRecvHeader
开始,我们一开始接收到的 Downstream
的 HttpHeader
1 2 3 4 5 6 7 func (s *downStream) appendHeaders(endStream bool ) { s.upstreamProcessDone = endStream headers := s.convertHeader(s.downstreamRespHeaders) if err := s.responseSender.AppendHeaders(s.context, headers, endStream); err != nil { log.Proxy.Errorf(s.context, "append headers error: %s" , err) } }
然后读取数据就在
1 2 3 4 5 6 7 8 9 10 11 func (s *downStream) appendData(endStream bool ) { s.upstreamProcessDone = endStream data := s.convertData(s.downstreamRespDataBuf) s.requestInfo.SetBytesSent(s.requestInfo.BytesSent() + uint64 (data.Len())) s.responseSender.AppendData(s.context, data, endStream) if endStream { s.endStream() } }
说到这里,我们一定要好好分析下 这几个 appendData
是怎么工作的,其实看起来还是挺绕的。
小结2: Upstream 和 DownStream 的交互 对于整个处理流程的核心部分,我们就一直在和 upstream
和 downstream
这两个 package 的对象进行交互。 大概有这么几个函数很重要:
OnReceive:这是接收到数据时候的方法
receiveHeaders:接收到 http header 部分
receiveData: 接收到 http body 部分
这里面有一个很重要的抽象是 pkg/types/stream.go:StreamSender
1 2 3 4 5 6 7 type StreamSender interface { AppendHeaders(ctx context.Context, headers api.HeaderMap, endStream bool ) error AppendData(ctx context.Context, data buffer.IoBuffer, endStream bool ) error AppendTrailers(ctx context.Context, trailers api.HeaderMap) error GetStream() Stream }
整个流程还是在 Downstream:OnReceive
进行控制,逻辑也就分为 将输入的Header
/输入的Body
写入 DownStream
,将输出按照同样的方式执行一次。不过值得注意的是,和架构图上一致。在处理单一请求的过程中,写入和回读都是同一个 goroutinue
,对于 Downstream
的 io
处理是遵守标准的 NetPoller
的模型的,并没有 RawPoller
机制,RawPoller
机制仅在 接受侧
有单独的定义。
小结:all in one
对于 Go Lanague
因为 Goroutine
使用起来很方便,分析 Go
项目很难分析 Theadmodel
这块内容,或者也可以认为开 Goroutine
本身的成本很低,大家不用过于担心切换的成本。
作为对比,下面也放上官方的一张图。
直到这里我们已经将 监听请求
接受数据
转发请求
响应请求
这几个部分都阅读过了,整个流程已经走通了,我们再去看看其他重要的功能。
多协议机制 Mosn
支持多协议之间的转换,我们从想问的分析中,我们可以知道所谓的 Proxy
就是我们整个 FilterManager
中的一个 Filter
。因此对于配置文件
1 2 3 4 5 6 7 8 { "type" : "proxy" , "config" : { "downstream_protocol" : "Http1" , "upstream_protocol" : "Http1" , "router_config_name" : "server_router" } }
Mosn
这个功能非常的有趣,因为实际上因为RPC协议非常的多,因此想要支持多种协议的话,都是需要用户自行进行拓展的,不过 Mosn
帮大家做了一部分的工作,流程如下图:
在 pkg/protocol/xprotocol/example/encoder.go
中有一个例子。对于大多数的 RPC
协议我们都是需要获取到 足够
信息之后再处理,那第一步显然是我们需要先获得一些数据也就是 MinimalDecodeLen
Decode 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (proto *proto) Decode(ctx context.Context, data types.IoBuffer) (interface {}, error ) { if data.Len() >= MinimalDecodeLen { magic := data.Bytes()[0 ] dir := data.Bytes()[2 ] switch dir { case DirRequest: return decodeRequest(ctx, data) case DirResponse: return decodeResponse(ctx, data) } } return nil , nil }
之后我们需要获得完整的数据
decodeRequest 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func decodeRequest (ctx context.Context, data types.IoBuffer) (cmd interface {}, err error ) { bytesLen := data.Len() bytes := data.Bytes() payloadLen := binary.BigEndian.Uint32(bytes[7 :]) frameLen := RequestHeaderLen + int (payloadLen) if bytesLen < frameLen { return } data.Drain(frameLen) return request, err }
这样就足够满足我们对于自定义数据的处理能力。而对于协议的解析部分还有一个困难的问题需要我们继续处理也就是 多路复用
,我们不能在一个 TCP
链路上仅仅处理一个请求,这就是最低效的 Http1
,因此协议需要能够定义出来,如果我在一个链路上进行多次请求是怎么区分的。对于 Mosn
来说这里还是有一个很精妙的设计,利用上下的隐性属性作为唯一的 ID
进行判断。这里就不展开,可以查看官方的 协议扩展框架
我们来关注下对于流程线上的问题,就是这段逻辑是镶嵌在我们的流程的何处的。在特殊的协议处理部分我们在启动时候定义的数据格式都类似于
1 2 3 4 5 6 7 8 9 10 11 12 13 "filters" : [ { "type" : "proxy" , "config" : { "downstream_protocol" : "X" , "upstream_protocol" : "X" , "router_config_name" : "server_router" , "extend_config" : { "sub_protocol" : "dubbo" } } } ]
因此我们在处理特定协议的时候需要传入指定的扩展协议如 dubbo
等。因此在构建的 proxy
的时候,将这个保存在 proxy
的属性中。
proxy:NewProxy 1 2 3 4 if json.Unmarshal([]byte (extJSON), &xProxyExtendConfig); xProxyExtendConfig.SubProtocol != "" { proxy.context = mosnctx.WithValue(proxy.context, types.ContextSubProtocol, xProxyExtendConfig.SubProtocol) log.DefaultLogger.Tracef("[proxy] extend config subprotocol = %v" , xProxyExtendConfig.SubProtocol) }
而在我们收到数据的时候会发现在 OnData
的时候我们会创建 ServerStream
的时候传入协议属性
proxy:OnData 1 2 3 4 5 6 7 8 9 10 func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus { if p.serverStreamConn == nil { protocol, err := stream.SelectStreamFactoryProtocol(p.context, prot, buf.Bytes()) p.serverStreamConn = stream.CreateServerStreamConnection(p.context, protocol, p.readCallbacks.Connection(), p) } p.serverStreamConn.Dispatch(buf) return api.Stop }
因此如果我们使用扩展的 协议
这里创建的 ServerStreamConn
也就是 pkg/stream/xprotocol/conn.go
,而他的处理逻辑如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (sc *streamConn) Dispatch(buf types.IoBuffer) { for { streamCtx := sc.ctxManager.Get() frame, err := sc.protocol.Decode(streamCtx, buf) if frame == nil && err == nil { return } if frame != nil { xframe, ok := frame.(xprotocol.XFrame) if !ok { log.Proxy.Errorf(sc.ctx, "[stream] [xprotocol] conn %d, %v frame type not match : %T" , sc.netConn.ID(), sc.netConn.RemoteAddr(), frame) return } sc.handleFrame(streamCtx, xframe) } sc.ctxManager.Next() } }
因此这段逻辑其实还是穿插在我们的 Proxy
中作为 Filter
中的一环,而这一环有需要依赖构建出不同的 StreamConn
进行处理,现在已知的支持 http1
http2
xprotocol
这3个类型。
插件机制 对于 Mosn
的插件机制也相对复杂,对于 Envoy
来说我们只有2处可以进行插件埋点,Mosn
也相对一致。
SteamFilter
MOSN 扩展中使用频率最高的扩展点。对于配置项如下
1 2 3 4 5 6 7 8 9 10 11 12 13 "listeners" : [ { "filter_chains" : [ { } ] , "stream_filters" : [ { "type" : "demo" , "config" : { "User" : "admin" } } ] } ]
从配置看,我可以发现 StreamFilter
和 FilterChain
是平级的(不过实际上并不是)。因此 Stream
分了两种类型:
进入 proxy 流程以后,如果存在 ReceiverFilter,那么就会执行对应的逻辑,ReceiverFilter 包括两个阶段,“路由前”和“路由后”,在每个 Filter 处理完成以后,会返回一个状态,如果是 Stop 则会中止后续尚未执行的 ReceiverFilter,通常情况下,返回 Stop 状态的 Filter 都会回写一个响应。如果是 Continue 则会执行下一个 ReceiverFilter,直到本阶段的 ReceiverFilter 都执行完成或中止;路由前阶段的 ReceiverFIlter 执行完成后,就会执行路由后阶段,其逻辑和路由前一致。如果是正常转发,那么随后 MOSN 会收到一个响应或者发现其他异常直接回写一个响应,此时就会进入到 SenderFilter 的流程中,完成 SenderFilter 的处理。SenderFilter 处理完成以后,MOSN 会写响应给 Client,并且完成最后的收尾工作,收尾工作包括一些数据的回收、日志的记录,以及 StreamFilter 的“销毁”(调用 OnDestroy)。
原文中有几个重点,在系统解析完一个 请求
之后才会进入这个流程,因此在 pkg/stream/http/stream.go:serve
中可以发现构建这个 StreamFilter
的逻辑
pkg/proxy/downstream.go 1 2 3 4 func (s *downStream) AddStreamSenderFilter(filter api.StreamSenderFilter) { sf := newActiveStreamSenderFilter(s, filter) s.senderFilters = append (s.senderFilters, sf) }
回到我们熟悉的地方来 DownStream:receive
中
receive 1 2 3 4 5 6 7 8 9 10 11 12 case types.DownFilterAfterChooseHost: if log.Proxy.GetLogLevel() >= log.DEBUG { s.printPhaseInfo(types.DownFilterAfterChooseHost, id) } s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers) if p, err := s.processError(id); err != nil { return p } phase++
官方的Demo可以看一下,使用起来还是很方便的 Stream Filter Demo
MOSN Plugin 机制 Mosn
还提供了另外一种基于通讯机制的方式
在这里有个例子:Stream Filter Plugin demo
不过这种算是上面一种的变形,实际上我们依然需要将 StreamFilter
注册到系统中,不过这一次所有的逻辑处理不在编译器固定,而是有一个 gRPC
的外部服务可以进行通讯判断。因此其实不再限于单一语言,我们只需要遵守 gRPC
的协议即可。
平滑升级 平滑升级可以参考 聊聊服务端热更新 和 Nginx vs Envoy vs MOSN 平滑升级原理解析 不做展开了。
参考