前言
網絡資料的基本機關永遠是 byte(位元組)。Java NIO 提供 ByteBuffer 作為位元組的容器,但這個類是過于複雜,有點難以使用,切換讀寫狀态需要flip()等。
Netty 中 ByteBuffer 的替代是 ByteBuf,一個強大的實作,解決 JDK 的 API 的限制,以及為網絡應用程式開發者一個更好的工具。 但 ByteBuf 并不僅僅暴露操作一個位元組序列的方法;這也是專門的 Netty 的 ChannelPipeline 的語義設計。
在本文中,我們會說明相比于 JDK 的 API,ByteBuf 所提供的卓越的功能和靈活性。這也将使我們能夠更好地了解了 Netty 的資料處理。
1、Buffer API
主要包括
- ByteBuf
- ByteBufHolder
Netty 使用 reference-counting (引用計數) 來判斷何時可以釋放 ByteBuf 或 ByteBufHolder 和其他相關資源,進而可以利用池和其他技巧來提高性能和降低記憶體的消耗。這一點上不需要開發人員做任何事情,但是在開發 Netty 應用程式時,尤其是使用 ByteBuf 和 ByteBufHolder 時,你應該盡可能早地釋放池資源。
引用計數:學習過 JVM 的小夥伴應該知道垃圾回收有引用計數法和可達性分析這兩種算法判斷對象是否存活,Netty 就使用了 引用計數法來優化記憶體的使用。引用計數確定了當對象的引用計數大于 1 時,對象就不會被釋放,當計數減少至 0 時, 對象就會被釋放,如果程式通路一個已被釋放的引用計數對象,那麼将會導緻一個 IllegalReferenceCountException 異常。 在 Netty 中,ByteBuf 和 ByteBufHolder 都實作了 ReferenceCounted 接口。
1.1 ByteBuf特點概覽
- 使用者可以自定義緩沖區類型對其擴充
- 通過内置的符合緩沖區類型實作了透明的零拷貝
- 容量可以按需增長(類似
)StringBuilder
- 切換讀寫模式不用調用
方法flip()
- 讀寫使用各自的索引
- 支援方法的鍊式調用
- 支援引用計數
- 支援池化
2、ByteBuf類介紹
2.1工作模式
ByteBuf
維護了兩個指針,一個用于讀取(
readerIndex
),一個用于寫入(
writerIndex
).
使用ByteBuf的API中的
read*
方法讀取資料時,
readerIndex
會根據讀取位元組數向後移動,但是
get*
方法不會移動
readerIndex
;使用
write*
資料時,
writerIndex
會根據位元組數移動,但是
set*
方法不會移動
writerIndex
.(
read*
表示
read
開頭的方法,其餘意義相同)
讀取資料時,如果
readerIndex
超過了
writerIndex
會觸發
IndexOutOfBoundsException
.
可以指定
ByteBuf
容量最大值,
capacity(int)
或
ensureWritable(int)
,當超出容量時會抛出異常.
2.2 使用模式
2.2.1 HEAP BUFFER (堆緩沖區)
将ByteBuf存入JVM的堆空間,堆緩沖區可以在沒有使用池化的情況下快速配置設定和釋放,非常适合用來處理遺留資料的。
除此之外,ByteBuf的堆緩沖區還提供了一個後備數組(backing array),後備數組和ByteBuf中的資料是對應的,如果修改了backing array中的資料,ByteBuf中的資料是同步的。
它還提供了直接通路數組的方法,通過 ByteBuf.array() 來擷取 byte[] 資料。
public static void main(String[] args) {
ByteBuf heapBuf = Unpooled.buffer(1024);
if(heapBuf.hasArray()){
heapBuf.writeBytes("Hello,heapBuf".getBytes());
System.out.println("數組第一個位元組在緩沖區中的偏移量:"+heapBuf.arrayOffset());
System.out.println("緩沖區中的readerIndex:"+heapBuf.readerIndex());
System.out.println("writerIndex:"+heapBuf.writerIndex());
System.out.println("緩沖區中的可讀位元組數:"+heapBuf.readableBytes());//等于writerIndex-readerIndex
byte[] array = heapBuf.array();
for(int i = 0;i < heapBuf.readableBytes();i++){
System.out.print((char) array[i]);
if(i==5){
array[i] = (int)'.';
}
}
//不會修改readerIndex位置
System.out.println("\n讀取資料後的readerIndex:"+heapBuf.readerIndex());
//讀取緩沖區的資料,檢視是否将逗号改成了句号
while (heapBuf.isReadable()){
System.out.print((char) heapBuf.readByte());
}
}
}
輸出:
數組第一個位元組在緩沖區中的偏移量:0
緩沖區中的readerIndex:0
writerIndex:13
緩沖區中的可讀位元組數:13
Hello,heapBuf
讀取資料後的readerIndex:0
Hello.heapBuf
如果hasArray()傳回false,嘗試通路backing array會報錯
2.2.2 DIRECT BUFFER (直接緩沖區)
在 Java 中,我們建立的對象大部分都是存儲在堆區之中的,但這不是絕對的。
在 NIO 的 API 中, 允許 Buffer 配置設定直接記憶體,即作業系統的記憶體。
直接緩沖區存儲于JVM堆外的記憶體空間,這樣做有一個好處,當你想把JVM中的資料寫給socket,需要将資料複制到直接緩沖區(JVM堆外記憶體)再交給socket,如果使用直接緩沖區,将減少複制這一過程。
但是直接緩沖區也是有不足的,與JVM堆的緩沖區相比,他們的配置設定和釋放是比較昂貴的。而且還有一個缺點,面對遺留代碼的時候,可能不确定ByteBuf使用的是直接緩沖區還是堆緩沖區,你可能需要進行一次額外的複制,與自帶後備數組的堆緩沖區來講,這要多做一些工作。是以,如果确定容器中的資料會被作為數組來通路,你可能更願意使用堆記憶體。
//實際上你不知道從哪獲得的引用,這可能是一個直接緩沖區的ByteBuf
//忽略Unpooled.buffer方法,當做不知道從哪獲得的directBuf
ByteBuf directBuf = Unpooled.buffer(1024);
//如果想要從數組中通路資料,需要将直接緩沖區中的資料手動複制到數組中
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
directBuf.getBytes(directBuf.readerIndex(), array);
handleArray(array, 0, length);
}
2.2.3 COMPOSITE BUFFER (複合緩沖區)
複合緩沖區,我們可以建立多個不同的 ByteBuf,然後提供一個這些 ByteBuf 組合的視圖 CompositeByteBuf。我們可以動态的向 CompositeByteBuf 中添加和删除其中的 ByteBuf 執行個體,JDK 的 ByteBuffer 沒有這樣的功能。
警告:CompositeByteBuf.hasArray() 總是傳回 false,因為它可能既包含堆緩沖區,也包含直接緩沖區。
聚合緩沖區是個非常好用的東西,是多個ByteBuf的聚合視圖,可以添加或删除ByteBuf執行個體。
CompositeByteBuf中的ByteBuf執行個體可能同僚包含直接記憶體配置設定和非直接記憶體配置設定。如果其中隻有一個執行個體,那麼調用CompositeByteBuf中的hasArray()方法将傳回該元件上的hasArray()方法的值,否則傳回false
多個ByteBuf組成一個完整的消息是很常見的,比如header和body組成的HTTP協定傳輸的消息。消息中的body有時候可能能重用,我們不想每次都建立重複的body,我們可以通過CompositeByteBuf來複用body。
對比一下JDK中的ByteBuffer實作複合緩沖區和Netty中的CompositeByteBuf.
//JDK版本實作複合緩沖區
public static void byteBufferComposite(ByteBuffer header, ByteBuffer body) {
//使用一個數組來儲存消息的各個部分
ByteBuffer[] message = new ByteBuffer[]{ header, body };
// 建立一個新的ByteBuffer來複制合并header和body
ByteBuffer message2 =
ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();
}
//Netty中的CompositeByteBuf
public static void byteBufComposite() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer(1024); // 可能是直接緩存也可能是堆緩存中的
ByteBuf bodyBuf = Unpooled.buffer(1024); // 可能是直接緩存也可能是堆緩存中的
messageBuf.addComponents(headerBuf, bodyBuf);
//...
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
}
CompositeByteBuf不支援通路其後備數組,是以通路CompositeByteBuf中的資料類似于通路直接緩沖區
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();
byte[] array = new byte[length];
//将CompositeByteBuf中的資料複制到數組中
compBuf.getBytes(compBuf.readerIndex(), array);
//處理一下數組中的資料
handleArray(array, 0, array.length);
Netty使用CompositeByteBuf來優化socket的IO操作,避免了JDK緩沖區實作所導緻的性能和記憶體使用率的缺陷.記憶體使用率的缺陷是指對可複用對象大量的複制,Netty對其在内部做了優化,雖然沒有暴露出來,但是應該知道CompositeByteBuf的優勢和JDK自帶工具的弊端.
JDK的NIO包中提供了Scatter/Gather I/O技術,字面意思是打散和聚合,可以了解為把單個ByteBuffer切分成多個或者把多個ByteBuffer合并成一個.
3、位元組級操作
ByteBuf的索引從0開始,最後一個索引是
capacity()-1
.
周遊示範:
ByteBuf buffer = Unpooled.buffer(1024);
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);//這種方法不會移動readerIndex指針
System.out.println((char) b);
}
3.1 readerIndex和writerIndex
JDK中的
ByteBuffer
隻有一個索引,需要通過
flip()
來切換讀寫操作,Netty中的
ByteBuf
既有讀索引,也有寫索引,通過兩個索引把ByteBuf劃分了三部分.
可以調用discardReadBytes()方法可丢棄可丢棄位元組并回收空間.
調用discardReadBytes()方法之後
使用read或skip方法都會增加readerIndex.
移動readerIndex讀取可讀資料的方式
ByteBuf buffer = ...;
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
write*方法寫入ByteBuf時會增加writerIndex,如果超過容量會抛出IndexOutOfBoundException.
writeableBytes()可以傳回可寫位元組數.
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
3.2 索引管理
JDK 的定義了
InputStream
和
mark(int readlimit)
reset()
方法,這些方法分别被用來将流中的目前位置标記為指定的值,以及将流重置到該位置。
同樣,可以通過調用
、
markReaderIndex()
、
markWriterIndex()
和
resetWriterIndex()
來标記和重置
resetReaderIndex()
的
ByteBuf
和
readerIndex
。這些和
writerIndex
上的調用類似,隻是沒有
InputStream
參數來指定标記什麼時候失效。
readlimit
如果将索引設定到一個無效位置會抛出
IndexOutOfBoundsException
.
可以通過
clear()
歸零索引,歸零索引不會清除資料.
3.3 查找
ByteBuf中很多方法可以确定值的索引,如
indexOf()
.
複雜查找可以通過那些需要一個
ByteBufProcessor
作為參數的方法完成.這個接口應該可以使用
lambda
表達式(但是我現在使用的Netty4.1.12已經廢棄了該接口,應該使用
ByteProcessor
).
ByteBuf buffer = ...;
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
3.4 派生緩沖區
派生緩沖區就是,基于原緩沖區一頓操作生成新緩沖區.比如複制,切分等等.
duplicate()
;
slice()
;
slice(int, int)
;
Unpooled.unmodifiableBuffer(…)
;
order(ByteOrder)
;
readSlice(int)
.
每個這些方法都将傳回一個新的 ByteBuf 執行個體,它具有自己的讀索引、寫索引和标記索引。 其内部存儲和 JDK 的 ByteBuffer 一樣也是共享的。這使得派生緩沖區的建立成本是很低廉的,但是這也意味着,如果你修改了它的内容,也同時修改了其對應的源執行個體,是以要小心
//複制
public static void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) != copy.getByte(0);
}
//切片
public static void byteBufSlice() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == sliced.getByte(0);
}
還有一些讀寫操作的API,留在文末展示吧.
4、ByteBufHolder接口
從表面了解起來,ByteBufHolder是ByteBuf的持有者,的确沒有錯。
我們經常發現, 除了實際的資料負載之外, 我們還需要存儲各種屬性值。 HTTP 響應便是一個很好的例子, 除了表示為位元組的内容,還包括狀态碼、 cookie 等。
為了處理這種常見的用例, Netty 提供了 ByteBufHolder。 ByteBufHolder 也為 Netty 的進階特性提供了支援,如緩沖區池化,其中可以從池中借用 ByteBuf, 并且在需要時自動釋放。ByteBufHolder 隻有幾種用于通路底層資料和引用計數的方法。
5、ByteBuf的配置設定
前面介紹了ByteBuf的一些基本操作和原理,但卻并未說明如何配置設定一個ByteBuf,這裡将講解ByteBuf的配置設定方式。
5.1 ByteBufAllocator
為了減少配置設定和釋放記憶體的開銷,Netty通過 ByteBufAllocator 實作了ByteBuf的池化。以下是ByteBufAllocator 的常見方法。
- buffer: 傳回一個基于堆或直接記憶體的ByteBuf,具體取決于實作。
- heapBuffer: 傳回一個基于堆記憶體的ByteBuf。
- directBuffer: 傳回一個基于直接記憶體的ByteBuf。
- compositeBuffer: 傳回一個組合ByteBuf。
- ioBuffer: 傳回一個用于套接字的ByteBuf。
我們可以通過
ByteBufAllocator
來配置設定一個
ByteBuf
執行個體.
ByteBufAllocator
接口實作了ByteBuf的池化。
可以通過
Channel
(每個都可以有一個不同的
ByteBufAllocator
執行個體)或者綁定到
ChannelHandler
的
ChannelHandlerContext
擷取一個到
ByteBufAllocator
的引用。
//從Channel擷取一個ByteBufAllocator的引用
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ChannelHandlerContext擷取ByteBufAllocator 的引用
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
Netty提供了兩種ByteBufAllocator的實作: PooledByteBufAllocator和UnpooledByteBufAllocator。PooledByteBufAllocator池化了ByteBuf的執行個體以提高性能并最大限度地減少記憶體碎片,此實作的配置設定記憶體的方法 是使用 jemalloc,此種 方法配置設定記憶體的效率非常高,已被大量現代作業系統采用。 UnpooledByteBufAllocator的實作不 池化ByteBuf執行個體, 并且在每次它被調用時都會傳回一個新的執行個體。Netty預設使用的是 PooledByteBufAllocator
。
5.2 Unpooled緩沖區
可能有時候拿不到ByteBufAllocator引用的話,可以使用Unpooled工具類來建立未持化ByteBuf執行個體.
5.3 ByteBufUtil類
ByteBufUtil 提供了用于操作 ByteBuf 的靜态的輔助方法。因為這個 API 是通用的, 并且和池化無關,是以這些方法已然在配置設定類的外部實作。
這些靜态方法中最有價值的可能就是 hexdump()方法, 它以十六進制的表示形式列印ByteBuf 的内容。這在各種情況下都很有用,例如, 出于調試的目的記錄 ByteBuf 的内容。十六進制的表示通常會提供一個比位元組值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的位元組表示。
另一個有用的方法是 boolean equals(ByteBuf, ByteBuf), 它被用來判斷兩個 ByteBuf執行個體的相等性。如果你實作自己的 ByteBuf 子類,你可能會發現 ByteBufUtil 的其他有用方法。
6、引用計數
引用計數是一種通過在某個對象所持有的資源不再被其他對象引用時釋放該對象所持有的資源來優化記憶體使用和性能的技術。 它們都實作了 interface ReferenceCounted。 引用計數背後的想法并不是特别的複雜;它主要涉及跟蹤到某個特定對象的活動引用的數量。一個 ReferenceCounted 實作的執行個體将通常以活動的引用計數為 1 作為開始。隻要引用計數大于 0, 就能保證對象不會被釋放。當活動引用的數量減少到 0 時,該執行個體就會被釋放。注意,雖然釋放的确切語義可能是特定于實作的,但是至少已經釋放的對象應該不可再用了。
//從Channel擷取ByteBufAllocator
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ByteBufAllocator配置設定一個ByteBuf
ByteBuf buffer = allocator.directBuffer();
assert buffer.refCnt() == 1;//引用計數是否為1
7、擴容
擴容是指在ByteBuf在寫入資料時,已經超過容量但沒有超過最大容量
在容量小于512時,會選擇16的整數倍進行擴容超過512時則會選擇2^n擴容
比如容量為12,寫入資料在13的位置,容量會擴容到16,寫出位置在17容量會擴容到32
寫入位置在514容量會擴容到1024,寫入位置在1026,容量會擴容到2048.......以此類推
8、記憶體釋放
ByteBuf的記憶體釋放是通過ReferenceCounted來實作的(采用的是引用計數的算法來回收記憶體)
io.netty.util.ReferenceCounted
package io.netty.util;
public interface ReferenceCounted {
/**
* Increases the reference count by {@code 1}.
* 此方法是将bytebuf的引用加1
*/
ReferenceCounted retain();
/**
* Decreases the reference count by {@code 1} and
deallocates this object if the reference count reachesat
* {@code 0}.
* 此方法是将bytebuf的引用-1
* @return {@code true} if and only if the reference count
became {@code 0} and this object has been deallocated
*/
boolean release();
}
當bytebuf的引用計數為0時就會回收調用改bytebuf(不同會有不同的回收算法池化的會入池,非池化的會執行回收記憶體(直接記憶體回收和堆記憶體回收))
9、零拷貝
ByteBuf的slice(int index,intlength);
他是将一個bytebuf按照指定的索引切,切多長的一個buf。内部使用的是同一個記憶體的buf。用新的指針來指向切出來的buf。
是以在改變一個buf的同時。另一個buf的内容也會改變
(注意,切出來的buf不允許往裡添加新的内容,釋放了原有的buf的内容後會影響切出來的buf)
duplicate()方法是直接将整個buf的讀寫指針複制一份(使用的還是同一塊記憶體)
想要真正的複制就使用copy相關的方法
compositeByteBuf
建立
CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer();
//在邏輯上将幾個Bytebuf組合在一起
//帶上一個boolean參數的方法,自動計算讀寫指針,不然讀寫指針還是為0
buf.addComponents(true,buf1,buf2);