Netty源码分析-(3)-ChannelPipeline
ChannelPipeline
ChannelPipeline是整个Netty的流程处理的中心,整个结构如下图所示
在 ChannelPipeline.html 有非常详细的文档。
每个channel内部都会持有一个ChannelPipeline对象pipeline。pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。
DefaultChannelPipeline
DefaultChannelPipeline
是Netty的实现类,我们看看他的内部实现。
1 | public class DefaultChannelPipeline implements ChannelPipeline { |
在 ➋ 处我们看到了支撑业务的的 Channel
, ➌ 是一个成功的回调函数。而比较有趣的就是这个 head 和 tail了,我们都知道Pipeline就像一个管道一样将流程贯通起来,那Head和Tail看起来就是头和尾的意思。我们看看这2个变量的赋值。
1 | protected DefaultChannelPipeline(Channel channel) { |
我们在这里看到tail 和 head 分别是Netty自己实现的类,我们来看看类声明。
1 | final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler |
Wow,很精妙的实现,分别是一个 ChannelInboundHandler 和一个 ChannelOutboundHandler。
Pipeline 增加 Handler
我们知道我们实现具体的业务逻辑是在 Handler
中,我们在Demo中有一个声明
1 | .childHandler(new ChannelInitializer<SocketChannel>() { |
我们在这里添加一个 DiscardHandler
,我们探索下这个 addLast()
行为究竟做了什么。
1 | public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { |
在 ➊ 处我们发现最为重要的增加逻辑就是在 addLast0()
中
1 | private void addLast0(AbstractChannelHandlerContext newCtx) { |
这里是一个很标准的链表操作,看到这里我们就可以大致才出来addXXX()
一系列的行为都是类似,用一张图来表示下
我们一直操作的都是在 Head
和 Tail
之间,我们可以任意的在之间增加或者删除具体的 Handler
Pipeline的调用顺序
万物的开始我们知道从接受数据开始在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
中有一行
1 | allocHandle.incMessagesRead(1); |
➊ 就是万物的开始,从此开始进入Pipeline,然我又在io.netty.channel.DefaultChannelPipeline#fireChannelRead
中获得
1 | public final ChannelPipeline fireChannelRead(Object msg) { |
➋ 处我们可以看到我们第一次阅读数据是从HeadContext开始的
在 io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
1 |
|
➌ 处查找InBoundHandler,这个函数还蛮有意思
1 | private AbstractChannelHandlerContext findContextInbound() { |
是一个标准的循环,我们一直去找到下一个Inbound的Handler去处理。
小结1
在Head的调用中有点特殊有点复杂,我们使用时序图来理解下。
通过 findContextInbound() 就获得我们装载的第一个InboundHandler。
通过分析我们得知Pipeline的下一次的调用依赖的是 io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
这个函数。所以在自己编写的 ChannelInboundHandlerAdapter
实现类中,我们在实现 channelRead
方法的时候,如果我们需要调用下一层,我们需要手动 ctx.fireChannelRead(msg);
延伸 Pipeline 的 Write操作
从Head的Read操作中,我们可以猜测出,Write也就是Tail的Write操作,我们去验证下自己的思路。
在io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)
处我们发现
1 |
|
➊ 处证明处理的流程是从 Tail开始的
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
中
1 | private void write(Object msg, boolean flush, ChannelPromise promise) { |
➋ 和FindInBound多么相似,也就是找个下一个OutBound
最终的写入处是在io.netty.channel.DefaultChannelPipeline.HeadContext#write
1 |
|
我们看到熟悉的unsafe,根据我们这么久以来和Netty打交道的经验,这里应该就是Java原生的Nio部分。在具体的实现部分
1 | public final void write(Object msg, ChannelPromise promise) { |
我们将需要写入的数据放入了 outboundBuffer
中。
具体的写入逻辑在 flush
中,最终会调用 io.netty.channel.socket.nio.NioSocketChannel#doWrite
这部分就留给读者自行阅读了。这个时候我们再回顾下 大家会发现Netty的Pipeline实现的是如此的优雅,核心的逻辑清晰明了。
知识点
- ServerChannel 如何监听事件
- BossEventLoopGroup 如何将事件转让给 WorkerEventLoopGroup (ServerBootstrapAcceptor用途)
附录:ChannelPipeline事件
- Inbound 事件:
- ChannelHandlerContext.fireChannelRegistered()
- ChannelHandlerContext.fireChannelActive()
- ChannelHandlerContext.fireChannelRead(Object)
- ChannelHandlerContext.fireChannelReadComplete()
- ChannelHandlerContext.fireExceptionCaught(Throwable)
- ChannelHandlerContext.fireUserEventTriggered(Object)
- ChannelHandlerContext.fireChannelWritabilityChanged()
- ChannelHandlerContext.fireChannelInactive()
- ChannelHandlerContext.fireChannelUnregistered()
- Outbound 事件:
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
- ChannelHandlerContext.write(Object, ChannelPromise)
- ChannelHandlerContext.flush()
- ChannelHandlerContext.read()
- ChannelHandlerContext.disconnect(ChannelPromise)
- ChannelHandlerContext.close(ChannelPromise)
- ChannelHandlerContext.deregister(ChannelPromise)