JDK NIO之ByteBuffer的局限性如下:
(1)長度固定,一旦配置設定完成,它的容量将不能動态擴充和收縮,而需要編碼的POJO對象大雨ByteBuffer的容量時,會發生索引越界異常;
(2)隻有一個辨別位置的指針position,讀寫的是偶需要搜公條用flip()和rewind()等,使用着必須小心的處理這些API,否則很容易導緻程式越界異常;
(3)ByteBuffer的API功能有限,一些進階和實用扽特性不支援,需要使用者自己程式設計實作、
為了彌補這些不足,Netty提供了自己的緩沖區實作ByteBuf。
ByteBuf與ByteBuf一樣維護了一個byte數組,提供以下幾類基本功能;
* 7中java基礎類型,byte數組,ByteBuffer等的讀寫;
* 緩沖區自身的copy和slice等;
* 設定網絡位元組序;
* 構造緩沖區執行個體;
* 操作位置指針等方法;
ByteBuf通過兩個位置指針來協助緩沖的讀寫操作:讀指針:readerIndex和寫指針writerIndex;
因為netty中ByteBuf的讀寫索引比較簡單,這裡對于讀寫索引的關系及相關的API不做詳細的介紹,感興趣的讀者可以去相關的API參考。
一、ByteBuf與ByteBuffer的互相轉換:
ByteBuf與ByteBuffer的互相轉換:
@Override
public ByteBuffer nioBuffer() {
return nioBuffer(readerIndex, readableBytes());
}
nioBuffer的具體實作這裡使用PooledHeapByteBuf中的實作來看:
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
index = idx(index);
ByteBuffer buf = ByteBuffer.wrap(memory, index, length);
return buf.slice();
}
一、ByteBuf的繼承結構:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX9kkeOl3ZE10dnpXTmJEViZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39jNwUjMyQDN1EDNyIDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
ByteBuf可以分為兩類:
(1)對記憶體:HeapByteBuf自己緩沖區,特點是記憶體的配置設定和回收速度快,可以被JVM自動回收,,缺點是如果使用Socket的IO讀寫,需要額外做一次記憶體複制,将堆記憶體對應的額緩沖區複制到核心Channel中,性能會有一定的下降。
(2)直接記憶體。DirectByteBuf位元組緩沖區也可以叫做直接緩沖區,非堆記憶體。它在堆外進行記憶體配置設定,相比于堆記憶體,它的配置設定和回收速度會慢一些。但是将它寫入或者從SocketChannel中讀取時,由于少了一次記憶體複制。速度比堆記憶體要快。
是以Netty提供了多種ByteBuf 的實作共開發者選擇。在長期的開發實踐中,表明,在IO通信線程的讀寫緩沖區使用DirectByteBuf, 後端業務消息的編解碼子產品使用HeapByteBuf,這樣組合可以達到性能最優。
從記憶體回收的角度看,ByteBuf也分為兩類:基于對象池的ByteBuf和普通ByteBuf。兩者的主要差別就是基于對象池的ByteBuf可以重用ByteBuf對象,它自己建立了一個記憶體池,可以循環利用建立的額ByteBuf,提升記憶體的使用效率,降低由于高負載導緻的頻繁GC。測試表明使用記憶體池後的Netty在高負載,大并發沖擊下的記憶體和GC更加平穩。
二、AbstractByteBuf部分源碼介紹:
一、讀操作:
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(memory, idx(index), dst, dstIndex, length);
return this;
}
二、寫操作:
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
// 擴容機制:首先設定門檻值為4M.當需要的新容量正好等于門檻值,則使用門檻值作為新的緩沖區容量。
// 如果新申請的記憶體空間大于門檻值,不能采用倍增的方式(防止記憶體膨脹和浪費)擴張記憶體。
// 采用每次步進4M的方式進行記憶體擴張。擴張的時候需要對擴張後的那次u你和最大記憶體進行比較,
// 如果大于緩沖區的最大長度,則用maxCapacity作為擴容的緩沖區容量。如果擴容後的新容量小于門檻值,則以64為基礎進行倍增。
// 直到倍增後的結果大于或等于需要的容量值。采用倍增或步進算法的原因是:如果以minNewCapacity作為目标容量,
// 則本次擴容後的科協位元組數剛好夠本次寫入使用。吸入完成後,他的可寫位元組數會變成0,下次需要寫入的時候,
// 需要再次進行動态擴張,由于動态擴張需要進行記憶體複制。頻繁的記憶體複制會導緻性能下降。
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
三、AbstractReferenceCountedByteBuf源碼分析:
從類的名字九可以看出該類主要是對引用進行技術,類似與JVM中的對象引用計數器,用于跟蹤對象的引用和銷毀,做自動記憶體回收。
CAS :compareAndSet,擷取目前變量的值,根據變量計算出一個新值,如果這時變量的值沒有變化,就用新值更新變量,如果有變化則不更新。 僞代碼: for(;;;) { 擷取變量值A 計算變量的新值儲存在B 如果變量A的值未變化,用B的值更新A,退出循環 如果變量A的值有變化,繼續這個循環,直至更新成功才退出循環 }
1.成員變量:
// 通過原子的方式對成員變量進行更新等操作,以實作線程安全,消除鎖。
// public static AtomicIntegerFieldUpdaternewUpdater(Class tclass,
// String fieldName) 這裡就不詳細分析他的源碼了,其實很簡單,他讓tclass的成員fieldName具有了原子性,是不是很簡單~
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
// 用于跟蹤對象的引用次數
private volatile int refCnt = 1;
@Override
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
每調用一次retain方法,計數器加一
compareAndSet方法用來擷取自己的值和期望的值進行比較,如果其間被其他線程修改了,那麼比對失敗,進行自旋操作,重新獲得計數器重新比較
compareAndSet這個方法是CAS操作,由作業系統層面提供。
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
需要注意的是:黨refCnt == 1時,意味着申請和釋放相等,說明對象引用已經不可達, 該對象需要被釋放和垃圾回收掉, 則通過調用deallocate方法來釋放ByteBuf對象
四、PooledByteBuf記憶體池原理分析:
1.PoolArena:Arena本身是指一開區域,在記憶體管理中,MemoryArena是指記憶體中的一大塊連續的區域,PoolA人啊就是Netty的記憶體池實作類。為了集中管理記憶體的配置設定和釋放,同時提高配置設定的釋放記憶體時候的性能,很多架構和應用都會預先申請一大塊記憶體,然後通過提供相應的配置設定和釋放接口來使用記憶體,這樣一來, 對記憶體的管理就被集中到幾個類或函數中,由于不在頻繁使用系統條用來申請和釋放記憶體,應用或者系統的性能也會大大提高,在這種涉及思路下,預先申請的一大塊記憶體就被成為Memory Arena。
不同的架構,Memory Arena的實作不同,Netty的PoolArena是由多個Chunk組成的大塊記憶體區域,而每個Chunk則由一個或多個Page組成,是以,對記憶體的組織和管理也就主要集中在如何管理群組織Chunk和Page了, PoolArena中的記憶體Chunk定義如下所示。
abstract class PoolArena<T> {
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
- qInit:存儲剩餘記憶體0-25%的chunk
- q000:存儲剩餘記憶體1-50%的chunk
- q025:存儲剩餘記憶體25-75%的chunk
- q050:存儲剩餘記憶體50-100%個chunk
- q075:存儲剩餘記憶體75-100%個chunk
- q100:存儲剩餘記憶體100%chunk
CHunk主要用來組織和管理多個Page的記憶體配置設定和釋放,在Netty中,Chunk中的Page被配置設定成一個二叉樹,假設一個Chunk由8個Page組成,那麼這些Page将會被按照如下圖方式組織:
Chunk
|
|
page page
| |
| |
page page page page
| | | |
| | | |
page page page page page page page page
page的大小是8個位元組,Chunk的大小是64個位元組,真棵樹有4層,第一層(野子節點所在的層)用來配置設定page的記憶體,第三層用來配置設定兩個page的記憶體,一次類推。每個節點都記錄了自己咋整個Memory Arena中的偏移位址,黨一個節點代表的記憶體區域被配置設定出去之後,這個節點就會被标記為已配置設定,自這個節點一下的所有節點在後面的記憶體配置設定請求都會被忽略,舉例來說,當我們請求一個16位元組的記憶體時,上面這個樹中第二層的4個節點中的一個就會被标記為已配置設定,這就表示整個MemoryArena中有16個位元組被配置設定出去了,新的配置設定請求隻能從剩下的三個節點及其子樹中去尋找合适的節點。對樹的周遊采用深度優先的算法,但是在選擇哪個子節點繼續周遊時則是随即的,并不像通常的深度優先算法中那樣總是通路左邊的子節點。
PoolSubpage
對于小于一個page的記憶體,Netty在Page中完成配置設定。每個page會被切分成大小相同的多個存儲塊。存儲快的大小都由第一次申請的記憶體塊大小決定,嘉定一個page是8個位元組,如果第一次申請的是4個位元組,則這個page就包含兩個資料塊,如果第一次申請的是8個自己,那麼這個page就包含一個資料塊。
一個page隻能使用者配置設定與第一次申請時大小相同的記憶體,比如:一個4位元組的page,如果第一次配置設定了1位元組的記憶體,那麼後面這個page隻能繼續配置設定1位元組的記憶體,如果有一個申請了2位元組記憶體的請求,就需要在一個新的page中中進行配置設定;
Page中存儲區域的使用狀态通過一個long數組進行維護,數組中每個long的每一位表示一個塊存儲區域的占用情況:0表示未占用,1表示已占用。對于一餓4位元組的page來說,如果這個page用來配置設定一個位元組的存儲區域,那麼long’數組中就隻有一個long類型的元素。這個熟知的低4位用來訓示各個存儲區域的占用情況。對于一個128位元組的Page來說,如果這個Page也是用來配置設定1個位元組的存儲區域。那麼long數組中就會包含2個元素,總共128位,每一位代表一個區域的占用情況。
無論是Chunk還是Page,都是通過狀态位來辨別記憶體是否可用。不同之處是CHunk通過在二叉樹上對節點進行辨別實作。Page是通過維護塊的使用狀态辨別來實作。