Netty實戰
Netty是什麼?
是一款用于建立高性能網絡應用程式的進階架構
異步的
事件驅動的
主要構件
Channel
Java NIO的基本構造
資料傳入或傳出的載體
可以打開、關閉
可以連接配接、斷開連接配接
Netty内部為每個Channel配置設定了一個EventLoop
回調
一個方法
這個方法的引用提供給另一個方法,以便後者在适當的時機調用前者
作用
通常在操作完成後通知相關方
Netty使用回調處理事件
Future
同樣可用作通知
方法1調用方法2,方法2将耗時操作交給線程處理,并立刻傳回一個Future占位符,方法1可以繼續幹活,并在将來使用Future得到方法2交給線程處理後的真正執行結果
弊端
get()之前需要手動檢測是否完成
或者直接get(),這将會一直阻塞到真正的操作完成
Netty的改進
ChannelFuture
增加了監聽器,在真正操作完成時,自動獲得通知
ChannelFutureListener
operationComplete()
Future和回調相輔相成,通常一起使用
事件和ChannelHandler
事件可觸發的動作
記錄日志
資料轉換
流控制
應用程式邏輯
......
入站事件
連接配接激活或失活
資料讀取
使用者事件
錯誤事件
......
出站事件
将資料寫到或者沖刷套接字
打開或關閉遠端節點的連接配接
......
ChannelHandler可看作是為相應特定事件而被執行的回調
一組ChannelHandler事件處理器構成一條鍊
事件->處理器->事件->處理器->->->->
Netty提供了大量預定義的ChannelHandler實作
HTTP
SSL/TLS
......
Netty的傳輸方式
NIO
基于java.nio.channels包,基于選擇器的方式
零拷貝
資料直接從檔案系統移動到網絡接口
而不需要從核心空間複制到使用者空間
Epoll
僅Linux支援
比NIO傳輸更快,完全非阻塞
支援零拷貝
OIO
基于java.net (http://java.net)包,使用阻塞流
SO_TIMEOUT逾時抛出異常,進而繼續處理循環
Local
在VM内部通過管道進行通信的本地傳輸
不需要暴露網絡
Embedded
常用于單元測試ChannelHandler
不需要真實的基于網絡的傳輸,可用于測試ChannelHandler
可以将一組ChannelHandler嵌入到其他的ChannelHandler内部,進而擴充一個CH的功能,而不需要修改其内部代碼
ByteBuf
是什麼?
Netty的資料容器
作為JavaNIO提供的ByteBuffer替代品
功能更多
使用更加簡單友善
優點
可被使用者自定義的緩沖區類型擴充
通過内置的複合緩沖區類型實作了透明的零拷貝
容量可按需增長
讀和寫切換不需要調用ByteBuffer的flip()方法
讀和寫使用不同的索引
試圖讀超出寫入的長度資料,将會溢出異常
可以指定ByteBuf的最大容量,預設時Integer.MAX_VALUE
寫索引超出也會溢出異常
支援方法的鍊式調用
支援引用計數
支援池化
使用模式
堆緩沖區(支撐數組)
适合有遺留資料需要處理的情況
存儲在JVM的堆空間中
提供快速的配置設定和釋放
直接緩沖區
不被GC回收
配置設定和釋放較為昂貴
多一次複制到堆
複合緩沖區
CompositeByteBuf
提供一個聚合視圖,友善管理
位元組級操作
随機通路索引
getByte(i)
順序通路索引
可丢棄位元組
0~readIndex
read後即可丢棄
write會增加writeIndex
get不會增加readIndex
set不會增加writeIndex
丢棄後,readIndex變為0,writeIndex也會變少
可讀位元組
buffer.isReadable()
可寫位元組
buffer.writableBytes()
索引管理
mark
reset
clear
同時清零
查找操作
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
FIND_CR
FIND_NULL
查找值的索引
派生緩沖區
傳回新的ByteBuf執行個體
但共享了原始ByteBuffer
是以建立成本低廉
但改變了派生緩沖區,也将改變原始緩沖區
派生方法
duplicate()
slice()
slice(int,int)
Unpooled.unmodifiableBuffer(...)
order(ByteOrder)
readSlice(int)
非派生方法
copy()
産生了獨立的真實緩沖區的副本
讀/寫操作
write\read
改變index
set\get
不改變index
更多的操作
capacity()
傳回目前可容納的位元組數
可擴充
maxCapacity()
傳回最大可容納的位元組數
ByteBufHolder
用于緩沖區池化,從池中借用ByteBuf
在需要時自動釋放
三個主要方法
content()
傳回該ByteBufHolder所持有的ByteBuf
copy()
傳回這個ByteBufHolder的一個深拷貝,包括一個其所包含的ByteBuf的非共享拷貝
duplicate()
傳回這個ByteBufHolder的一個淺拷貝,包括一個其所包含的ByteBuf的共享拷貝
ByteBuf配置設定
ByteBufAllocator
池化的
按需配置設定
支援操作
buffer()
傳回一個基于堆或者直接記憶體存儲的ByteBuf
heapBuffer()
傳回一個基于堆記憶體存儲的ByteBuf
directBuffer()
傳回一個基于直接記憶體存儲的ByteBuf
compositeBuffer()
compositeHeapBuffer()
compositeDirectBuffer()
ioBuffer()
傳回一個用于套接字的I/O操作的 ByteBuf
Unpooled緩沖區
未池化的
支援操作
buffer()
未池化的基于堆
directBuffer()
未池化的基于直接記憶體
wrappedBuffer()
包裝了給定資料的
copiedBuffer()
複制了給定資料的
ByteBufUtil
最常用方法
hexdump()
十六進制列印ByteBuf
equals()
比較ByteBuf執行個體
引用計數
對象的活動引用的數量
引用數量為0的執行個體會被釋放
池化執行個體釋放的依據
buffer.release()
傳回該對象是否被釋放
ChannelHandler和ChannelPipeline
Channel狀态
已建立,未注冊到EventLoop
ChannelUnregistered
已注冊到EventLoop
ChannelRegistered
處于活動狀态(已連接配接到它的遠端節點),可收發資料了
ChannelActive
沒有連接配接到遠端節點
ChannelInactive
ChannelHandler的生命周期
被添加到ChannelPipeLine中
handlerAdded
從ChannelPipeLine中移除
handlerRemoved
在ChannelPipeLine中發生異常
exceptionCaught
ChannelInboundHandler接口
channelRead
從channel讀資料時調用
channelComplite
讀完成時調用
channelWritability
channel可寫狀态改變時調用
ChannelOutboundHandler接口
bind()
将channel綁定到本地位址
connect()
将channel連接配接到遠端節點
disconnect()
請求将channel從遠端節點中斷開
close()
請求關閉channel時調用
deregister()
請求将channel從EventLoop中登出時調用
read()
讀資料時
flush()
通過channel将入隊資料沖刷到遠端節點
write()
通過channel将資料寫到遠端節點
ChannelHandler擴充卡
ChannelInboundHandlerAdapter
SimpleChannelInboundHandler<T>
ChannelOutboundHandlerAdapter
資源洩漏保護
ResourceLeakDetector
洩漏檢測級别
DISABLED
SIMPLE
ADVANCED
PARANOID
消耗大,隻在調試階段使用
ChannelPipeline
管理操作
add*()
入站在頭部
入站時經過
ChannelInboundHandler
1->2->3->...
而不會經過ChannelOutboundHandler
出站在尾部
出站時經過
ChannelOutboundHandler
6->5->4->...
而不會經過ChannelInboundHandler
remove()
replace()
通路操作
get()
通過名稱或者類型傳回ChannelHandler
context()
傳回和 ChannelHandler綁定的ChannelHandlerContext
names()
傳回所有ChannelHandler的名稱
觸發事件
入站操作
fire*()
出站操作
writeAndFlush()
......
ChannelHandlerContext
每向pipline中添加一個channelhandler就會建立一個context
調用方法時,隻會傳播給此context目前關聯的ChannelHandler的下一個處理該事件的ChannelHandler
而Channel和pipeline是會沿着整個pipeline傳播
具有豐富的處理事件和執行IO操作的API
可讓ChannelHandler與它的pipeline以及其他ChannelHandler互動
操作
pipeline()
writeAndFlush()
name()
handler()
......
意義
減少将事件傳經對它不感興趣的ChannelHandler所帶來的開銷
避免将事件傳經那些可能會對它感興趣的ChannelHandler
進階應用
動态切換協定
異常處理
處理入站異常
exceptionCaught
預設轉發給pipeline的下一個handler,直到最後沒被處理的話會被标記為未處理
可重寫處理異常,如不往下抛異常等
處理出站異常
基于通知機制
每個出站操作都将傳回一個ChannelFuture
ChannelPromise是ChannelFuture的一個子類
幾乎每一個出站方法都傳入了一個ChannelPromise
可被配置設定用于異步通知的監聽器
也可以提供立即通知的可寫方法
setSuccess()
setFailure(Throwable cause)
addListener(ChannelFutureListener)的時機
方法1
在每次write後傳回的future中添加
優點
可共享調用者的屬性和方法等
方法2
在ChannelHandler的write方法中,統一添加
優點
一次編寫,到處可用
EvenLoop和線程模型
線程模型概述
使用線程池
從池中得到一個空閑的Thread去執行一個Runnable執行個體任務
用完後放回池中
減少線程建立和銷毀開銷
EventLoop
while(!terminated)
List<Runnable> readyEvents = blockUntilEventsReady();
阻塞,直到有事件已經就緒可被運作
for (Runnable ev: readyEvents) {
ev.run();
}
循環周遊,并處理所有的事件
一個EventLoop可用于服務多個Channel
優化
任務排程
ScheduledExecutorService
延遲之後運作或者周期性地執行任務
schedule()
scheduleAtFixedRate()
取消任務
future.cancel()
線程池方式
單線程方式
實作細節
如果是EventLoop中的事件,則立即執行
否則加入隊列,送出到線程池執行
EventLoop線程配置設定
非阻塞
一個EventLoop占用一個線程
一個線程管理多個Channel
阻塞
一個EventLoop占用一個線程
一個線程管理一個Cahnnel
Bootstrap引導
是什麼?
如何對應用程式進行配置,将各個部分組織起來,讓應用程式運作起來的過程
兩種引導
BootStrap
為用戶端和使用無連接配接協定的應用程式建立Channel
bind()用于無連接配接協定(UDP)
DatagramChannel
用于收發資料包
無法得知發送的資料是否被接收成功
ServerBootStrap
方法
handler()
此方法添加的ChannelHandler由ServerChannel(此為用于接受子Channel的Channel)處理
bind()建立了ServerChannel
ServerChannel管理了很多子連接配接的Channel
childHandler()
此方法添加的ChannelHandler由已被接受的子Channel處理
更常用
option()
用于配置Channel-Option,通過bind()方法設定到Channel
如配置5秒連接配接逾時
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
SO_TIMEOUT 隻對于 OIO有效,對于NIO 沒有作用,用于逾時控制
bind()後修改option無效
childOption()
更常用
attr()
用于指定ServerChannel上的屬性,友善使用
childAttr()
更常用
一些常見應用
在子Channel中引導BootStrap用戶端
通過共享Channel的EventLoop來避免新增EventLoop線程産生資源浪費
bootstrap.group(channel.eventLoop())
在引導過程中添加多個ChannelHandler
ChannelInitializerImpl extends ChannelInitializer
重寫initChannel()
在initChannel()中添加ChannelHandler
channel.pipeline().addLast(channel handler)
serverbootstrap.childHandler(new ChannelInitializerImpl())
優雅的關閉應用程式
釋放所有的資源,并且關閉所有的目前正在使用中的Channel
group.shutdownGracefully().sync();
EventLoop和Chanel不能混用NIO和OIO
網絡協定
WebSocket
新版浏覽器基本都支援WebSocket,WS是HTML5的一部分
可通過HTTP協定進行WebSocket協定握手
在協定更新後即可通過WebSocket傳輸消息
特點
雙向
異步
實時
應用
WEB版聊天室
......
TCP
優點
有序
安全
缺點
較UDP慢
UDP
缺點
無序,可能丢包
不安全
優點
傳輸快
無需握手确認
無需回報
支援多點傳播
傳輸到一個預定義的主機組
支援廣播
傳輸到網絡(或子網)上的所有主機
成員
廣播者
Broadcaster
監聽者
Monitor
應用
日志廣播
視訊傳輸
......