天天看點

Netty源碼分析:PooledByteBufAllocatorNetty源碼分析:PooledByteBufAllocator

Netty源碼分析:PooledByteBufAllocator

無論是我們使用語句

ByteBuf byteBuf = Unpooled.buffer(256);

來配置設定buf,還是使用如下的語句來配置設定Buf:

PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);
        ByteBuf byteBuf = allocator.heapBuffer();
           

都是使用了 PooledByteBufAllocator 這個類類配置設定Buf。是以就來分析下這個類。

1、常量的說明

public class PooledByteBufAllocator extends AbstractByteBufAllocator {

        private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
        //預設的PoolArena個數,堆記憶體類型
        private static final int DEFAULT_NUM_HEAP_ARENA;
        //預設的PoolArena個數,直接記憶體類型
        private static final int DEFAULT_NUM_DIRECT_ARENA;
        //預設的Page的個數,最小為4K,預設為8K
        private static final int DEFAULT_PAGE_SIZE;
        /*
        由于每個chunk中的page是用平衡二叉樹映射管理每個PoolSubpage是否被配置設定,
        maxOrder為樹的深度,深度為maxOrder層的節點數量為 1 << maxOrder。
        */
        private static final int DEFAULT_MAX_ORDER; //預設為 11        //預設的tiny cache 的大小
        private static final int DEFAULT_TINY_CACHE_SIZE; //512
        //預設的small cache的大小
        private static final int DEFAULT_SMALL_CACHE_SIZE;// 256
        //預設的normal cache的大小
        private static final int DEFAULT_NORMAL_CACHE_SIZE;//64
        private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
        private static final int DEFAULT_CACHE_TRIM_INTERVAL;
        //page容量的最小值,為4K。
        private static final int MIN_PAGE_SIZE = ;
        //最大chunk的大小,等于2的30次方,即1G。
        private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + ) / );
           

以上這些常量除了最後兩個都是在如下的static塊中進行初始化。

static {
        int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", );
        Throwable pageSizeFallbackCause = null;
        try {
            validateAndCalculatePageShifts(defaultPageSize);
        } catch (Throwable t) {
            pageSizeFallbackCause = t;
            defaultPageSize = ;
        }
        DEFAULT_PAGE_SIZE = defaultPageSize;

        int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", );
        Throwable maxOrderFallbackCause = null;
        try {
            validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
        } catch (Throwable t) {
            maxOrderFallbackCause = t;
            defaultMaxOrder = ;
        }
        DEFAULT_MAX_ORDER = defaultMaxOrder;

        // Determine reasonable default for nHeapArena and nDirectArena.
        // Assuming each arena has  chunks, the pool should not consume more than % of max memory.
        final Runtime runtime = Runtime.getRuntime();
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numHeapArenas",
                        (int) Math.min(
                                runtime.availableProcessors(),
                                Runtime.getRuntime().maxMemory() / defaultChunkSize /  / )));
        DEFAULT_NUM_DIRECT_ARENA = Math.max(,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numDirectArenas",
                        (int) Math.min(
                                runtime.availableProcessors(),
                                PlatformDependent.maxDirectMemory() / defaultChunkSize /  / )));

        // cache sizes
        DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", );
        DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", );
        DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", );

        //  kb is the default maximum capacity of the cached buffer. Similar to what is explained in
        // 'Scalable memory allocation using jemalloc'
        DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
                "io.netty.allocator.maxCachedBufferCapacity",  * );

        // the number of threshold of allocations when cached entries will be freed up if not frequently used
        DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
                "io.netty.allocator.cacheTrimInterval", );
                //省略了部分日志輸出代碼
    }
           

從中可以得到如下的資訊

1、首先是對DEFAULT_PAGE_SIZE進行初始化,預設是8K,使用者可以通過設定io.netty.allocator.pageSize來設定。

2、validateAndCalculatePageShifts函數用來檢查pageSize是否大于

MIN_PAGE_SIZE

(4K)且是2的幂次方。

3、對樹的深度DEFAULT_MAX_ORDER進行初始化,預設是11,使用者可以通過io.netty.allocator.maxOrder來進行設定。

4、初始化預設chunk的大小,為PageSize * (2 的 maxOrder幂)。

defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER
           

5、 計算PoolAreana的個數,PoolArena預設為: cpu核心線程數 與 最大堆記憶體/2/(3*chunkSize) 這兩個數中的較小者。這裡的除以2是為了確定系統配置設定的所有PoolArena占用的記憶體不超過系統可用記憶體的一半,這裡的除以3是為了保證每個PoolArena至少可以由3個PoolChunk組成。

使用者如果想修改,則通過設定io.netty.allocator.numHeapArenas/numDirectArenas來進行修改。

6、對cache sizes進行了設定,如下:

DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", );
        DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", );
        DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", );
           

7、 還對其他常量也進行了設定。

2、構造函數

下面來看下PooledByteBufAllocator的構造函數。

public PooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
    }

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
        this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
                DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
    }

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
        super(preferDirect);
        threadCache = new PoolThreadLocalCache();
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        //得到chunkSize,其值為:pageSize*2^maxOrder
        final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        if (nHeapArena < ) {
            throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
        }
        if (nDirectArena < ) {
            throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);

        if (nHeapArena > ) {
            heapArenas = newArenaArray(nHeapArena);
            for (int i = ; i < heapArenas.length; i ++) {
                heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
            }
        } else {
            heapArenas = null;
        }

        if (nDirectArena > ) {
            directArenas = newArenaArray(nDirectArena);
            for (int i = ; i < directArenas.length; i ++) {
                directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
            }
        } else {
            directArenas = null;
        }
    }
           

通過如上的構造函數可以看到,幹了如下幾件事:

1)使用了預設的值初始化了如下的字段:

private final int tinyCacheSize;
    private final int smallCacheSize;
    private final int normalCacheSize;
           

2)使用new PoolThreadLocalCache()執行個體化了threadCache 字段。

final PoolThreadLocalCache threadCache; 
           

3)重點:執行個體化了如下兩個數組。

private final PoolArena<byte[]>[] heapArenas;
    private final PoolArena<ByteBuffer>[] directArenas;
           

在執行個體化上面兩個PoolArena時,用到了如下的兩個參數

3.1)chunkSize

調用validateAndCalculateChunkSize函數求得,其值為:pageSize*2^maxOrder
           

3.2)pageShifts

調用如下的validateAndCalculatePageShifts求得,

private static int validateAndCalculatePageShifts(int pageSize) {
        if (pageSize < MIN_PAGE_SIZE) {
            throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + "+)");
        }

        if ((pageSize & pageSize - ) != ) {
            throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
        }

        // Logarithm base 2. At this point we know that pageSize is a power of two.
        return Integer.SIZE -  - Integer.numberOfLeadingZeros(pageSize);
    }
           

該函數首先檢查了pageSize是否大于4K且為2的幂次方,如果不是則抛異常。

如果是,則傳回Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize)的結果,結果是什麼呢?

假設pageSize為8192, 2的13次方的二進制碼為:(0000 0000 0000 0000 0010 0000 0000 0000),其補碼與原碼一樣,而Integer.numberOfLeadingZeros傳回pageSize的補碼最高位的1的左邊連續零的個數。而8192的二進制的高位有18個0,是以pageShifts為13。簡單來說pageShifts=log(pageSize)。

既然看到這裡,PoolArena.HeapArena和PoolArena.DirectArena中如下的構造函數.

heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
    directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
           

PoolArena.HeapArena構造函數如下;

HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
            super(parent, pageSize, maxOrder, pageShifts, chunkSize);
        }  
           

調用了父類PooledArena如下的構造函數

protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    //從PooledByteBufAllocator中傳送過來的相關字段值。
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        subpageOverflowMask = ~(pageSize - );//該變量用于判斷申請的記憶體大小與page之間的關系,是大于,還是小于
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = ; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        numSmallSubpagePools = pageShifts - ;
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = ; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        q100 = new PoolChunkList<T>(this, null, , Integer.MAX_VALUE);
        q075 = new PoolChunkList<T>(this, q100, , );
        q050 = new PoolChunkList<T>(this, q075, , );
        q025 = new PoolChunkList<T>(this, q050, , );
        q000 = new PoolChunkList<T>(this, q025, , );
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, );

        q100.prevList = q075;
        q075.prevList = q050;
        q050.prevList = q025;
        q025.prevList = q000;
        q000.prevList = null;
        qInit.prevList = qInit;
    } 
           

該構造函數主要幹了如下幾件事

1)初始化parent、pageSize、maxOrder、pageShifts等字段

2)執行個體化了如下兩個數組,這兩個數組相當重要,在博文Netty源碼分析:PoolArena中有詳細的介紹,這裡不再介紹。

private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;
           

3)建立了6個Chunk清單(PoolChunkList)來緩存用來配置設定給Normal(超過一頁)大小記憶體的PoolChunk,每個PoolChunkList中用head字段維護一個PoolChunk連結清單的頭部,每個PoolChunk中有prev,next字段。而PoolChunkList内部維護者一個PoolChunk連結清單頭部。

這6個PoolChunkList解釋如下:

qInit:存儲剩餘記憶體0-25%的chunk

q000:存儲剩餘記憶體1-50%的chunk

q025:存儲剩餘記憶體25-75%的chunk

q050:存儲剩餘記憶體50-100%個chunk

q075:存儲剩餘記憶體75-100%個chunk

q100:存儲剩餘記憶體100%chunk

這六個PoolChunkList也通過連結清單串聯,串聯關系是:qInit->q000->q025->q050->q075->q100.

3、ByteBuf byteBuf = allocator.heapBuffer(256)

接下來看下PooledByteBufAllocator類中heapBuffer方法(實際上是在其父類AbstractByteBufAllocator中定義的),代碼如下:

@Override
    public ByteBuf heapBuffer(int initialCapacity) {
        return heapBuffer(initialCapacity, Integer.MAX_VALUE);
    } 

    @Override
    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity ==  && maxCapacity == ) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);//檢查參數是否正确
        return newHeapBuffer(initialCapacity, maxCapacity);
    }
           

繼續看newHeapBuffer方法

PooledByteBufAllocator

    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<byte[]> heapArena = cache.heapArena;

        ByteBuf buf;
        if (heapArena != null) {
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);//分析
        } else {
            buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    } 
           

從上面的方法可知,接着就時調用了PoolArena的子類的allocate來配置設定記憶體,這個方法在博文Netty源碼分析:PoolArena有詳細的介紹,這裡不再介紹。

小結

基于前面分析的PoolArena、PoolChunk、PoolSubage這個類之後,他們的關系總結如下:

Netty源碼分析:PooledByteBufAllocatorNetty源碼分析:PooledByteBufAllocator

圖中想表達以下幾點:

1、PooledByteBufAllocator包括2個數組:HeapArena數組和DirectArena數組。當利用PooledByteBufAllocator配置設定記憶體時,是利用Arena數組中的元素來完成。

2、HeapArena包括:6個PoolChunkList連結清單(連結清單中的元素為PoolChunk),和兩個數組:tinySubpagePools和smallSubpagePools。當利用Arena來進行配置設定記憶體時,根據申請記憶體的大小有不同的政策,例如:如果申請記憶體的大小小于512時,則首先在cache嘗試配置設定,如果配置設定不成功則會在tinySubpagePools嘗試配置設定,如果配置設定不成功,則會在PoolChunk重新找一個PoolSubpage來進行記憶體配置設定,配置設定之後将此PoolSubpage儲存到tinySubpagePools中。

3、PoolChunk中包括一大塊記憶體T memory,将其分成N份,每一份就是一個PoolSubpage。

4、PoolSubpage由M個“塊”構成,塊的大小由第一次申請記憶體大小決定。當配置設定一次記憶體之後此page會被加入到PoolArena的tinySubpagePools或smallSubpagePools中,下次配置設定時就如果“塊”大小相同,則尤其直接配置設定。

繼續閱讀