Netty源码分析-(伪)-(4)-ByteBuf

ByteBuf

ByteBuf 是 Netty 底层支撑。缓冲区是不同的通道之间传递数据的中介,JDK中的ByteBuffer操作复杂,而且没有经过优化,所以在netty中实现了一个更加强大的缓冲区 ByteBuf 用于表示字节序列。ByteBuf在netty中是通过Channel传输数据的,新的设计解决了JDK中ByteBuffer中的一些问题。

  • 可扩展到用户定义的buffer类型中
  • 通过内置的复合buffer类型实现透明的零拷贝(zero-copy)
  • 容量可以根据需要扩展
  • 切换读写模式不需要调用ByteBuffer.flip()方法
  • 读写采用不同的索引
  • 支持方法链接调用
  • 支持引用计数
  • 支持池技术(比如:线程池、数据库连接池)

原理

ByteBuf维护两个不同的索引:读索引和写索引。当你从ByteBuf中读,它的readerIndex增加了读取的字节数;同理,当你向ByteBuf中写,writerIndex增加。下图显示了一个空的ByteBuf的布局和状态:
empty

自动扩容

Java Niojava.nio.HeapByteBuffer#put(byte) 中我们可以看java 的 ByteBuffer

1
2
3
4
5
6
7
8
9
10
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}

final int nextPutIndex() {
if (position >= limit)
throw new BufferOverflowException(); ➊
return position++;
}

Java ByteBuffer 达到上限的时候需要自己手动创建一个新的 ByteBuffer 再存入。而对比 ByteBuf

io.netty.buffer.AbstractByteBuf#writeByte

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}

final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

// Adjust to the new capacity.
capacity(newCapacity);
}

在 ➋ Netty的ByteBuf 会进行自动的扩容。拓展的倍数是 2

池化技术

传统的 ByteBuffer 的生命周期是委托给 JVM 管理的,Netty分为2块

  • 基于对象池的ByteBuf: PooledByteBuf和它的子类PoolDirectByteBuf、PoolUnsafeDirectByteBuf、PooledHeapByteBuf
  • 普通的ByteBuf: UnPoolDirectByteBuf、UnPoolUnsafeDirectByteBuf、UnPoolHeapByteBuf

基于对象池的ByteBuf 的核心逻辑就在 PoolArena 对象,关于 PoolArena 的原理可以参看

Netty的零拷贝

  • DirectByteBuf: ByteBuf可以分为HeapByteBuf和DirectByteBuf,当使用DirectByteBuf可以实现零拷贝
  • DefaultFileRegion: DefaultFileRegion是Netty的文件传输类,它通过transferTo方法将文件直接发送到目标Channel,而不需要循环拷贝的方式,提升了传输性能

这块都设计到java的 native 代码故不展开讲。

Netty的内存回收管理

Netty会通过 引用计数法 及时申请释放不再被引用的对象 ,实现上是通过 AbstractReferenceCountedByteBuf来实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// increment
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}

// decrement
private boolean release0(int decrement) {
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -decrement);
}
return false;
}

有同学会问,为什么需要计数引用,因为Netty采用了池化的技术,GC是无法回收这些数据的,需要自己实现一个简单的GC,技术引用是最简单的方式。