天天看点

Netty源码解析一 -- ByteBuf

JDK NIO之ByteBuffer的局限性如下:
(1)长度固定,一旦分配完成,它的容量将不能动态扩展和收缩,而需要编码的POJO对象大雨ByteBuffer的容量时,会发生索引越界异常;
(2)只有一个标识位置的指针position,读写的是偶需要搜公条用flip()和rewind()等,使用着必须小心的处理这些API,否则很容易导致程序越界异常;
(3)ByteBuffer的API功能有限,一些高级和实用扽特性不支持,需要使用者自己编程实现、
为了弥补这些不足,Netty提供了自己的缓冲区实现ByteBuf。
ByteBuf与ByteBuf一样维护了一个byte数组,提供以下几类基本功能;
* 7中java基础类型,byte数组,ByteBuffer等的读写;
* 缓冲区自身的copy和slice等;
* 设置网络字节序;
* 构造缓冲区实例;
* 操作位置指针等方法;
ByteBuf通过两个位置指针来协助缓冲的读写操作:读指针:readerIndex和写指针writerIndex;
因为netty中ByteBuf的读写索引比较简单,这里对于读写索引的关系及相关的API不做详细的介绍,感兴趣的读者可以去相关的API参考。
一、ByteBuf与ByteBuffer的相互转换:
ByteBuf与ByteBuffer的相互转换:
           
@Override
    public ByteBuffer nioBuffer() {
        return nioBuffer(readerIndex, readableBytes());
    }
           

nioBuffer的具体实现这里使用PooledHeapByteBuf中的实现来看:

@Override
    public ByteBuffer nioBuffer(int index, int length) {
        checkIndex(index, length);
        index = idx(index);
        ByteBuffer buf =  ByteBuffer.wrap(memory, index, length);
        return buf.slice();
    }
           

一、ByteBuf的继承结构:

Netty源码解析一 -- ByteBuf

ByteBuf可以分为两类:

(1)对内存:HeapByteBuf自己缓冲区,特点是内存的分配和回收速度快,可以被JVM自动回收,,缺点是如果使用Socket的IO读写,需要额外做一次内存复制,将堆内存对应的额缓冲区复制到内核Channel中,性能会有一定的下降。

(2)直接内存。DirectByteBuf字节缓冲区也可以叫做直接缓冲区,非堆内存。它在堆外进行内存分配,相比于堆内存,它的分配和回收速度会慢一些。但是将它写入或者从SocketChannel中读取时,由于少了一次内存复制。速度比堆内存要快。

因此Netty提供了多种ByteBuf 的实现共开发者选择。在长期的开发实践中,表明,在IO通信线程的读写缓冲区使用DirectByteBuf, 后端业务消息的编解码模块使用HeapByteBuf,这样组合可以达到性能最优。

从内存回收的角度看,ByteBuf也分为两类:基于对象池的ByteBuf和普通ByteBuf。两者的主要区别就是基于对象池的ByteBuf可以重用ByteBuf对象,它自己创建了一个内存池,可以循环利用创建的额ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁GC。测试表明使用内存池后的Netty在高负载,大并发冲击下的内存和GC更加平稳。

二、AbstractByteBuf部分源码介绍:

一、读操作:
 @Override
    public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
        checkReadableBytes(length);
        getBytes(readerIndex, dst, dstIndex, length);
        readerIndex += length;
        return this;
    }

    @Override
    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
        checkDstIndex(index, length, dstIndex, dst.length);
        System.arraycopy(memory, idx(index), dst, dstIndex, length);
        return this;
    }
           
二、写操作:
    @Override
    public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
        ensureAccessible();
        ensureWritable(length);
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    }

    @Override
    public ByteBuf ensureWritable(int minWritableBytes) {
        if (minWritableBytes < 0) {
            throw new IllegalArgumentException(String.format(
                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
        }

        if (minWritableBytes <= writableBytes()) {
            return this;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        // Adjust to the new capacity.
        capacity(newCapacity);
        return this;
    }

// 扩容机制:首先设置阈值为4M.当需要的新容量正好等于阈值,则使用阈值作为新的缓冲区容量。
// 如果新申请的内存空间大于阈值,不能采用倍增的方式(防止内存膨胀和浪费)扩张内存。
// 采用每次步进4M的方式进行内存扩张。扩张的时候需要对扩张后的那次u你和最大内存进行比较,
// 如果大于缓冲区的最大长度,则用maxCapacity作为扩容的缓冲区容量。如果扩容后的新容量小于阈值,则以64为基础进行倍增。
// 直到倍增后的结果大于或等于需要的容量值。采用倍增或步进算法的原因是:如果以minNewCapacity作为目标容量,
// 则本次扩容后的科协字节数刚好够本次写入使用。吸入完成后,他的可写字节数会变成0,下次需要写入的时候,
// 需要再次进行动态扩张,由于动态扩张需要进行内存复制。频繁的内存复制会导致性能下降。
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        if (minNewCapacity < 0) {
            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
        }
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }
           

三、AbstractReferenceCountedByteBuf源码分析:

从类的名字九可以看出该类主要是对引用进行技术,类似与JVM中的对象引用计数器,用于跟踪对象的引用和销毁,做自动内存回收。

CAS :compareAndSet,获取当前变量的值,根据变量计算出一个新值,如果这时变量的值没有变化,就用新值更新变量,如果有变化则不更新。 伪代码: for(;;;) {     获取变量值A     计算变量的新值保存在B     如果变量A的值未变化,用B的值更新A,退出循环     如果变量A的值有变化,继续这个循环,直至更新成功才退出循环 }

1.成员变量:
// 通过原子的方式对成员变量进行更新等操作,以实现线程安全,消除锁。
// public static AtomicIntegerFieldUpdaternewUpdater(Class tclass, 
// String fieldName) 这里就不详细分析他的源码了,其实很简单,他让tclass的成员fieldName具有了原子性,是不是很简单~
 private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;

    static {
        AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        }
        refCntUpdater = updater;
    }
    
    // 用于跟踪对象的引用次数
    private volatile int refCnt = 1;
           
@Override
    public ByteBuf retain() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, 1);
            }
            if (refCnt == Integer.MAX_VALUE) {
                throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                break;
            }
        }
        return this;
    }
           

每调用一次retain方法,计数器加一 

compareAndSet方法用来获取自己的值和期望的值进行比较,如果其间被其他线程修改了,那么比对失败,进行自旋操作,重新获得计数器重新比较 

compareAndSet这个方法是CAS操作,由操作系统层面提供。

    @Override
    public final boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }
           

需要注意的是:党refCnt == 1时,意味着申请和释放相等,说明对象引用已经不可达, 该对象需要被释放和垃圾回收掉, 则通过调用deallocate方法来释放ByteBuf对象

四、PooledByteBuf内存池原理分析:

1.PoolArena:Arena本身是指一开区域,在内存管理中,MemoryArena是指内存中的一大块连续的区域,PoolA人啊就是Netty的内存池实现类。为了集中管理内存的分配和释放,同时提高分配的释放内存时候的性能,很多框架和应用都会预先申请一大块内存,然后通过提供相应的分配和释放接口来使用内存,这样一来, 对内存的管理就被集中到几个类或函数中,由于不在频繁使用系统条用来申请和释放内存,应用或者系统的性能也会大大提高,在这种涉及思路下,预先申请的一大块内存就被成为Memory Arena。

不同的框架,Memory Arena的实现不同,Netty的PoolArena是由多个Chunk组成的大块内存区域,而每个Chunk则由一个或多个Page组成,因此,对内存的组织和管理也就主要集中在如何管理和组织Chunk和Page了, PoolArena中的内存Chunk定义如下所示。

abstract class PoolArena<T> {

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;
           
  • qInit:存储剩余内存0-25%的chunk
  • q000:存储剩余内存1-50%的chunk
  • q025:存储剩余内存25-75%的chunk
  • q050:存储剩余内存50-100%个chunk
  • q075:存储剩余内存75-100%个chunk
  • q100:存储剩余内存100%chunk
Netty源码解析一 -- ByteBuf

CHunk主要用来组织和管理多个Page的内存分配和释放,在Netty中,Chunk中的Page被分配成一个二叉树,假设一个Chunk由8个Page组成,那么这些Page将会被按照如下图方式组织:

                                                        Chunk

                                                            |

                                                            |

                       page                                                         page

                           |                                                                |

                           |                                                                |

         page                        page                           page                page

             |                                |                                |                        |

             |                                |                                |                        |

   page    page            page        page       page        page    page    page   

page的大小是8个字节,Chunk的大小是64个字节,真棵树有4层,第一层(野子节点所在的层)用来分配page的内存,第三层用来分配两个page的内存,一次类推。每个节点都记录了自己咋整个Memory Arena中的偏移地址,党一个节点代表的内存区域被分配出去之后,这个节点就会被标记为已分配,自这个节点一下的所有节点在后面的内存分配请求都会被忽略,举例来说,当我们请求一个16字节的内存时,上面这个树中第二层的4个节点中的一个就会被标记为已分配,这就表示整个MemoryArena中有16个字节被分配出去了,新的分配请求只能从剩下的三个节点及其子树中去寻找合适的节点。对树的遍历采用深度优先的算法,但是在选择哪个子节点继续遍历时则是随即的,并不像通常的深度优先算法中那样总是访问左边的子节点。

PoolSubpage

对于小于一个page的内存,Netty在Page中完成分配。每个page会被切分成大小相同的多个存储块。存储快的大小都由第一次申请的内存块大小决定,嘉定一个page是8个字节,如果第一次申请的是4个字节,则这个page就包含两个数据块,如果第一次申请的是8个自己,那么这个page就包含一个数据块。

一个page只能用户分配与第一次申请时大小相同的内存,比如:一个4字节的page,如果第一次分配了1字节的内存,那么后面这个page只能继续分配1字节的内存,如果有一个申请了2字节内存的请求,就需要在一个新的page中中进行分配;

Page中存储区域的使用状态通过一个long数组进行维护,数组中每个long的每一位表示一个块存储区域的占用情况:0表示未占用,1表示已占用。对于一饿4字节的page来说,如果这个page用来分配一个字节的存储区域,那么long’数组中就只有一个long类型的元素。这个熟知的低4位用来指示各个存储区域的占用情况。对于一个128字节的Page来说,如果这个Page也是用来分配1个字节的存储区域。那么long数组中就会包含2个元素,总共128位,每一位代表一个区域的占用情况。

无论是Chunk还是Page,都是通过状态位来标识内存是否可用。不同之处是CHunk通过在二叉树上对节点进行标识实现。Page是通过维护块的使用状态标识来实现。