我們一直說Redis的性能很快,那為什麼快?Redis為了達到性能最大化,做了哪些方面的優化呢?
在深度解析Redis的資料結構
這篇文章中,其實從資料結構上分析了Redis性能高的一方面原因。
在目前的k-v資料庫的技術選型中,Redis幾乎是首選的用來實作高性能緩存的方案,它的性能有多快呢?
根據官方的基準測試資料,一台普通硬體配置的Linux機器上運作單個Redis執行個體,處理簡單指令(O(n)或者O(logn)),QPS可以達到8W,如果使用pipeline批處理功能,QPS最高可以達到10W。
Redis 為什麼那麼快
Redis的高性能主要依賴于幾個方面。
- C語言實作,C語言在一定程度上還是比Java語言性能要高一些,因為C語言不需要經過JVM進行翻譯。
- 純記憶體I/O,記憶體I/O比磁盤I/O性能更快
- I/O多路複用,基于epoll的I/O多路複用技術,實作高吞吐網絡I/O
- 單線程模型,單線程無法利用到多核CPU,但是在Redis中,性能瓶頸并不是在計算上,而是在I/O能力,是以單線程能夠滿足高并發的要求。 從另一個層面來說,單線程可以避免多線程的頻繁上下文切換以及同步鎖機制帶來的性能開銷。
下面我們分别從上述幾個方面進行展開說明,先來看網絡I/O的多路複用模型。
從請求處理開始分析
當我們在用戶端向Redis Server發送一條指令,并且得到Redis回複的整個過程中,Redis做了什麼呢?
圖4-1
要處理指令,則redis必須完整地接收用戶端的請求,并将指令解析出來,再将結果讀出來,通過網絡回寫到用戶端。整個工序分為以下幾個部分:
- 接收,通過TCP接收到指令,可能會曆經多次TCP包、ack、IO操作
- 解析,将指令取出來
- 執行,到對應的地方将value讀出來
- 傳回,将value通過TCP傳回給用戶端,如果value較大,則IO負荷會更重
其中解析和執行是純cpu/記憶體操作,而接收和傳回主要是IO操作,首先我們先來看通信的過程。
網絡IO的通信原理
同樣,我也畫了一幅圖來描述網絡資料的傳輸流程
首先,對于TCP通信來說,每個TCP Socket的核心中都有一個發送緩沖區和一個接收緩沖區
接收緩沖區把資料緩存到核心,若應用程序一直沒有調用Socket的read方法進行讀取,那麼該資料會一直被緩存在接收緩沖區内。不管程序是否讀取Socket,對端發來的資料都會經過核心接收并緩存到Socket的核心接收緩沖區。
read所要做的工作,就是把核心接收緩沖區中的資料複制到應用層使用者的Buffer裡。
程序調用Socket的send發送資料的時候,一般情況下是将資料從應用層使用者的Buffer裡複制到Socket的核心發送緩沖區,然後send就會在上層傳回。換句話說,send傳回時,資料不一定會被發送到對端。
網卡中的緩沖區既不屬于核心空間,也不屬于使用者空間。它屬于硬體緩沖,允許網卡與作業系統之間有個緩沖;
核心緩沖區在核心空間,在記憶體中,用于核心程式,做為讀自或寫往硬體的資料緩沖區;
使用者緩沖區在使用者空間,在記憶體中,用于使用者程式,做為讀自或寫往硬體的資料緩沖區
網卡晶片收到網絡資料會以中斷的方式通知CPU,我有資料了,存在我的硬體緩沖裡了,來讀我啊。
CPU收到這個中斷信号後,會調用相應的驅動接口函數從網卡的硬體緩沖裡把資料讀到核心緩沖區,正常情況下會向上傳遞給TCP/IP子產品一層一層的處理。
NIO多路複用機制
Redis的通信采用的是多路複用機制,什麼是多路複用機制呢?
由于Redis是C語言實作,為了簡化大家的了解,我們采用Java語言來描述這個過程。
在了解多路複用之前,我們先來了解一下BIO。
BIO模型
在Java中,如果要實作網絡通信,我們會采用Socket套接字來完成。
Socket這不是一個協定,而是一個通信模型。其實它最初是BSD發明的,主要用來一台電腦的兩個程序間通信,然後把它用到了兩台電腦的程序間通信。是以,可以把它簡單了解為程序間通信,不是什麼進階的東西。主要做的事情不就是:
- A發包:發請求包給某個已經綁定的端口(是以我們經常會通路這樣的位址182.13.15.16:1235,1235就是端口);收到B的允許;然後正式發送;發送完了,告訴B要斷開連結;收到斷開允許,馬上斷開,然後發送已經斷開資訊給B。
- B收包:綁定端口和IP;然後在這個端口監聽;接收到A的請求,發允許給A,并做好接收準備,主要就是清理緩存等待接收新資料;然後正式接收;接受到斷開請求,允許斷開;确認斷開後,繼續監聽其它請求。
可見,Socket其實就是I/O操作,Socket并不僅限于網絡通信,在網絡通信中,它涵蓋了網絡層、傳輸層、會話層、表示層、應用層——其實這都不需要記,因為Socket通信時候用到了IP和端口,僅這兩個就表明了它用到了網絡層和傳輸層;而且它無視多台電腦通信的系統差别,是以它涉及了表示層;一般Socket都是基于一個應用程式的,是以會涉及到會話層和應用層。
建構基礎的BIO通信模型
BIOServerSocket
public class BIOServerSocket {
//先定義一個端口号,這個端口的值是可以自己調整的。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//先定義一個端口号,這個端口的值是可以自己調整的。
//在伺服器端,我們需要使用ServerSocket,是以我們先聲明一個ServerSocket變量
ServerSocket serverSocket=null;
//接下來,我們需要綁定監聽端口, 那我們怎麼做呢?隻需要建立使用serverSocket執行個體
//ServerSocket有很多構造重載,在這裡,我們把前邊定義的端口傳入,表示目前
//ServerSocket監聽的端口是8080
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
//回顧一下前面我們講的内容,接下來我們就需要開始等待用戶端的連接配接了。
//是以我們要使用的是accept這個函數,并且當accept方法獲得一個用戶端請求時,會傳回
//一個socket對象, 這個socket對象讓伺服器可以用來和用戶端通信的一個端點。
//開始等待用戶端連接配接,如果沒有用戶端連接配接,就會一直阻塞在這個位置
Socket socket=serverSocket.accept();
//很可能有多個用戶端來發起連接配接,為了區分用戶端,咱們可以輸出用戶端的端口号
System.out.println("用戶端:"+socket.getPort()+"已連接配接");
//一旦有用戶端連接配接過來,我們就可以用到IO來獲得用戶端傳過來的資料。
//使用InputStream來獲得用戶端的輸入資料
//bufferedReader大家還記得吧,他維護了一個緩沖區可以減少資料源讀取的頻率
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr=bufferedReader.readLine(); //讀取一行資訊
System.out.println("用戶端發了一段消息:"+clientStr);
//服務端收到資料以後,可以給到用戶端一個回複。這裡咱們用到BufferedWriter
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區觸發消息發送
}
}
BIOClientSocket
public class BIOClientSocket {
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//在用戶端這邊,咱們使用socket來連接配接到指定的ip和端口
Socket socket=new Socket("localhost",8080);
//使用BufferedWriter,像伺服器端寫入一個消息
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我是用戶端Client-01\n");
bufferedWriter.flush();
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String serverStr=bufferedReader.readLine(); //通過bufferedReader讀取服務端傳回的消息
System.out.println("服務端傳回的消息:"+serverStr);
}
}
上述代碼建構了一個簡單的BIO通信模型,也就是服務端建立一個監聽,用戶端向服務端發送一個消息,實作簡單的網絡通信,那BIO有什麼弊端呢?
我們通過對BIOServerSocket進行改造,關注case1和case2部分。
- case1: 增加了while循環,實作重複監聽
- case2: 當服務端收到用戶端的請求後,不直接傳回,而是等待20s。
public class BIOServerSocket {
//先定義一個端口号,這個端口的值是可以自己調整的。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
while(true) { //case1: 增加循環,允許循環接收請求
Socket socket = serverSocket.accept();
System.out.println("用戶端:" + socket.getPort() + "已連接配接");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine(); //讀取一行資訊
System.out.println("用戶端發了一段消息:" + clientStr);
Thread.sleep(20000); //case2: 修改:增加等待時間
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區觸發消息發送
}
}
}
接着,把BIOClientSocket複制兩份(client1、client2),同時向BIOServerSocket發起請求。
運作後看到的現象應該是: client1先發送請求到Server端,由于Server端等待20s才傳回,導緻client2的請求一直被阻塞。
這個情況會導緻一個問題,如果服務端在同一個時刻隻能處理一個用戶端的連接配接,而如果一個網站同時有1000個使用者通路,那麼剩下的999個使用者都需要等待,而這個等待的耗時取決于前面的請求的處理時長,如圖4-2所示。
圖4-2
基于多線程優化BIO
為了讓服務端能夠同時處理更多的用戶端連接配接,避免因為某個用戶端連接配接阻塞導緻後續請求被阻塞,于是引入多線程技術,代碼如下。
ServerSocket
public static void main(String[] args) throws IOException, InterruptedException {
final int DEFAULT_PORT=8080;
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
ExecutorService executorService= Executors.newFixedThreadPool(5);
while(true) {
Socket socket = serverSocket.accept();
executorService.submit(new SocketThread(socket));
}
}
SocketThread
public class SocketThread implements Runnable{
Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("用戶端:" + socket.getPort() + "已連接配接");
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = null; //讀取一行資訊
clientStr = bufferedReader.readLine();
System.out.println("用戶端發了一段消息:" + clientStr);
Thread.sleep(20000);
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩沖區觸發消息發送
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如圖4-3所示,當引入了多線程之後,每個用戶端的連結(Socket),我們可以直接給到線程池去執行,而由于這個過程是異步的,是以并不會同步阻塞影響後續連結的監聽,是以在一定程度上可以提升服務端連結的處理數量。
圖4-3
NIO非阻塞IO
使用多線程的方式來解決這個問題,仍然有一個缺點,線程的數量取決于硬體配置,是以線程數量是有限的,如果請求量比較大的時候,線程本身會收到限制進而并發量也不會太高。那怎麼辦呢,我們可以采用非阻塞IO。
NIO 從JDK1.4 提出的,本意是New IO,它的出現為了彌補原本IO的不足,提供了更高效的方式,提出一個通道(channel)的概念,在IO中它始終以流的形式對資料的傳輸和接受,下面我們示範一下NIO的使用。
NioServerSocket
public class NioServerSocket {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
//讀取資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
System.out.println(new String(buffer.array()));
//寫出資料
Thread.sleep(10000); //阻塞一段時間
//當資料讀取到緩沖區之後,接下來就需要把緩沖區的資料寫出到通道,而在寫出之前必須要調用flip方法,實際上就是重置一個有效位元組範圍,然後把這個資料接觸到通道。
buffer.flip();
socketChannel.write(buffer);//寫出資料
} else {
Thread.sleep(1000);
System.out.println("連接配接未就緒");
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
NioClientSocket
public class NioClientSocket {
public static void main(String[] args) {
try {
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost",8080));
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
byteBuffer.put("Hello I'M SocketChannel Client".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
//讀取服務端資料
byteBuffer.clear();
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務端的資料:" + new String(byteBuffer.array()));
} else {
System.out.println("服務端資料未準備好");
Thread.sleep(1000);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接配接阻塞,當服務端不存在阻塞的時候,就可以不斷輪詢處理用戶端的請求,如圖4-4所示,表示NIO下的運作流程。
圖4-4
上述這種NIO的使用方式,仍然存在一個問題,就是用戶端或者服務端需要通過一個線程不斷輪詢才能獲得結果,而這個輪詢過程中會浪費線程資源。
多路複用IO
大家站在全局的角度再思考一下整個過程,有哪些地方可以優化呢?
我們回到NIOClientSocket中下面這段代碼,當用戶端通過
read
方法去讀取服務端傳回的資料時,如果此時服務端資料未準備好,對于用戶端來說就是一次無效的輪詢。
我們能不能夠設計成,當用戶端調用
read
方法之後,不僅僅不阻塞,同時也不需要輪詢。而是等到服務端的資料就緒之後, 告訴用戶端。然後用戶端再去讀取服務端傳回的資料呢?
就像點外賣一樣,我們在網上下單之後,繼續做其他事情,等到外賣到了公司,外賣小哥主動打電話告訴你,你直接去前台取餐即可。
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務端的資料:" + new String(byteBuffer.array()));
} else {
System.out.println("服務端資料未準備好");
Thread.sleep(1000);
}
}
是以為了優化這個問題,引入了多路複用機制。
I/O多路複用的本質是通過一種機制(系統核心緩沖I/O資料),讓單個程序可以監視多個檔案描述符,一旦某個描述符就緒(一般是讀就緒或寫就緒),能夠通知程式進行相應的讀寫操作
什麼是fd:在linux中,核心把所有的外部裝置都當成是一個檔案來操作,對一個檔案的讀寫會調用核心提供的系統指令,傳回一個fd(檔案描述符)。而對于一個socket的讀寫也會有相應的檔案描述符,成為socketfd。
常見的IO多路複用方式有【select、poll、epoll】,都是Linux API提供的IO複用方式,那麼接下來重點講一下select、和epoll這兩個模型
- select:程序可以通過把一個或者多個fd傳遞給select系統調用,程序會阻塞在select操作上,這樣select可以幫我們檢測多個fd是否處于就緒狀态,這個模式有兩個缺點
- 由于他能夠同時監聽多個檔案描述符,假如說有1000個,這個時候如果其中一個fd 處于就緒狀态了,那麼目前程序需要線性輪詢所有的fd,也就是監聽的fd越多,性能開銷越大。
- 同時,select在單個程序中能打開的fd是有限制的,預設是1024,對于那些需要支援單機上萬的TCP連接配接來說确實有點少
- epoll:linux還提供了epoll的系統調用,epoll是基于事件驅動方式來代替順序掃描,是以性能相對來說更高,主要原理是,當被監聽的fd中,有fd就緒時,會告知目前程序具體哪一個fd就緒,那麼目前程序隻需要去從指定的fd上讀取資料即可,另外,epoll所能支援的fd上線是作業系統的最大檔案句柄,這個數字要遠遠大于1024
【由于epoll能夠通過事件告知應用程序哪個fd是可讀的,是以我們也稱這種IO為異步非阻塞IO,當然它是僞異步的,因為它還需要去把資料從核心同步複制到使用者空間中,真正的異步非阻塞,應該是資料已經完全準備好了,我隻需要從使用者空間讀就行】
I/O多路複用的好處是可以通過把多個I/O的阻塞複用到同一個select的阻塞上,進而使得系統在單線程的情況下可以同時處理多個用戶端請求。它的最大優勢是系統開銷小,并且不需要建立新的程序或者線程,降低了系統的資源開銷,它的整體實作思想如圖4-5所示。
用戶端請求到服務端後,此時用戶端在傳輸資料過程中,為了避免Server端在read用戶端資料過程中阻塞,服務端會把該請求注冊到Selector複路器上,服務端此時不需要等待,隻需要啟動一個線程,通過selector.select()阻塞輪詢複路器上就緒的channel即可,也就是說,如果某個用戶端連接配接資料傳輸完成,那麼select()方法會傳回就緒的channel,然後執行相關的處理即可。
圖4-5
NIOServer的實作如下
測試通路的時候,直接在cmd中通過telnet連接配接NIOServer,便可發送資訊。
public class NIOServer implements Runnable{
Selector selector;
ServerSocketChannel serverSocketChannel;
public NIOServer(int port) throws IOException {
selector=Selector.open(); //多路複用器
serverSocketChannel=ServerSocketChannel.open();
//綁定監聽端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);//非阻塞配置
//針對serverSocketChannel注冊一個ACCEPT連接配接監聽事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待事件就緒
Set selected=selector.selectedKeys(); //得到事件清單
Iterator it=selected.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next()); //分發事件
it.remove(); //移除目前時間
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey key) throws IOException {
if(key.isAcceptable()){ //如果是用戶端的連接配接事件,則需要針對該連接配接注冊讀寫事件
register(key);
}else if(key.isReadable()){
read(key);
}else if(key.isWritable()){
write(key);
}
}
private void register(SelectionKey key) throws IOException {
//得到事件對應的連接配接
ServerSocketChannel server=(ServerSocketChannel)key.channel();
SocketChannel channel=server.accept(); //獲得用戶端的連結
channel.configureBlocking(false);
//把目前用戶端連接配接注冊到selector上,注冊事件為READ,
// 也就是目前channel可讀時,就會觸發事件,然後讀取用戶端的資料
channel.register(this.selector,SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
channel.read(byteBuffer); //把資料從channel讀取到緩沖區
System.out.println("server receive msg:"+new String(byteBuffer.array()));
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
//寫一個資訊給到用戶端
channel.write(ByteBuffer.wrap("hello Client,I'm NIO Server\r\n".getBytes()));
}
public static void main(String[] args) throws IOException {
NIOServer server=new NIOServer(8888);
new Thread(server).start();
}
}
事實上NIO已經解決了上述BIO暴露的下面兩個問題:
- 同步阻塞IO,讀寫阻塞,線程等待時間過長。
- 在制定線程政策的時候,隻能根據CPU的數目來限定可用線程資源,不能根據連接配接并發數目來制定,也就是連接配接有限制。否則很難保證對用戶端請求的高效和公平。
到這裡為止,通過NIO的多路複用機制,解決了IO阻塞導緻用戶端連接配接處理受限的問題,服務端隻需要一個線程就可以維護多個用戶端,并且用戶端的某個連接配接如果準備就緒時,會通過事件機制告訴應用程式某個channel可用,應用程式通過select方法選出就緒的channel進行處理。
單線程Reactor 模型(高性能I/O設計模式)
了解了NIO多路複用後,就有必要再和大家說一下Reactor多路複用高性能I/O設計模式,Reactor本質上就是基于NIO多路複用機制提出的一個高性能IO設計模式,它的核心思想是把響應IO事件和業務處理進行分離,通過一個或者多個線程來處理IO事件,然後将就緒得到事件分發到業務處理handlers線程去異步非阻塞處理,如圖4-6所示。
Reactor模型有三個重要的元件:
- Reactor :将I/O事件發派給對應的Handler
- Acceptor :處理用戶端連接配接請求
- Handlers :執行非阻塞讀/寫
圖4-6
下面示範一個單線程的Reactor模型。
Reactor
Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理。
public class Reactor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
//建立選擇器
selector= Selector.open();
//建立NIO-Server
serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 綁定一個附加對象
key.attach(new Acceptor(selector,serverSocketChannel));
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待就緒事件
Set selectionKeys=selector.selectedKeys();
Iterator it=selectionKeys.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next());
it.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void dispatch(SelectionKey key){
//調用之前注冊時附加的對象,也就是attach附加的acceptor
Runnable r=(Runnable)key.attachment();
if(r!=null){
r.run();
}
}
public static void main(String[] args) throws IOException {
new Thread(new Reactor(8888)).start();
}
}
Acceptor
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個用戶端連接配接");
channel.configureBlocking(false);
//當channel連接配接中資料就緒時,調用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,将Hanlder和可能會發生事件的channel連結在一起,當發生事件時,可以立即觸發相應連結的Handler。
channel.register(selector, SelectionKey.OP_READ,new DispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
Handler
public class DispatchHandler implements Runnable{
private SocketChannel channel;
public DispatchHandler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 列印目前線程名稱,證明I/O是同一個線程來處理。
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=0,total=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
total += len;
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
System.out.println(channel.getRemoteAddress()+":Server Receive msg:"+msg);
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
示範方式,通過window的cmd視窗,使用telnet 192.168.1.102 8888 連接配接到Server端進行資料通信;也可以通過下面這樣一個用戶端程式來通路。
ReactorClient
public class ReactorClient {
private static Selector selector;
public static void main(String[] args) throws IOException {
selector=Selector.open();
//建立一個連接配接通道連接配接指定的server
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("192.168.1.102",8888));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while(true){
selector.select();
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> iterator=selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
if(key.isConnectable()){
handleConnection(key);
}else if(key.isReadable()){
handleRead(key);
}
}
}
}
private static void handleConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel=(SocketChannel)key.channel();
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
while(true) {
Scanner in = new Scanner(System.in);
String msg = in.nextLine();
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println("client receive msg:"+new String(byteBuffer.array()));
}
}
這是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)。
其中Reactor線程,負責多路分離套接字,有新連接配接到來觸發connect 事件之後,交由Acceptor進行處理,有IO讀寫事件之後交給hanlder 處理。
Acceptor主要任務就是建構handler ,在擷取到和client相關的SocketChannel之後 ,綁定到相應的hanlder上,對應的SocketChannel有讀寫事件之後,基于racotor 分發,hanlder就可以處理了(所有的IO事件都綁定到selector上,有Reactor分發)
Reactor 模式本質上指的是使用 I/O 多路複用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)的模式。
多線程單Reactor模型
單線程Reactor這種實作方式有存在着缺點,從執行個體代碼中可以看出,handler的執行是串行的,如果其中一個handler處理線程阻塞将導緻其他的業務處理阻塞。由于handler和reactor在同一個線程中的執行,這也将導緻新的無法接收新的請求,我們做一個小實驗:
- 在上述Reactor代碼的DispatchHandler的run方法中,增加一個Thread.sleep()。
- 打開多個用戶端視窗連接配接到Reactor Server端,其中一個視窗發送一個資訊後被阻塞,另外一個視窗再發資訊時由于前面的請求阻塞導緻後續請求無法被處理。
為了解決這種問題,有人提出使用多線程的方式來處理業務,也就是在業務處理的地方加入線程池異步處理,将reactor和handler在不同的線程來執行,如圖4-7所示。
圖4-7
多線程改造-MultiDispatchHandler
我們直接将4.2.5小節中的Reactor單線程模型改成多線程,其實我們就是把IO阻塞的問題通過異步的方式做了優化,代碼如下,
public class MultiDispatchHandler implements Runnable{
private SocketChannel channel;
public MultiDispatchHandler(SocketChannel channel) {
this.channel = channel;
}
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1);
@Override
public void run() {
processor();
}
private void processor(){
executor.execute(new ReaderHandler(channel));
}
public static class ReaderHandler implements Runnable{
private SocketChannel channel;
public ReaderHandler(SocketChannel socketChannel) {
this.channel = socketChannel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 列印目前線程名稱,證明I/O是同一個線程來處理。
ByteBuffer buffer= ByteBuffer.allocate(1024);
int len=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
if(len>0) {
System.out.println(channel.getRemoteAddress() + ":Server Receive msg:" + msg);
}
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
}
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個用戶端連接配接");
channel.configureBlocking(false);
//當channel連接配接中資料就緒時,調用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,将Hanlder和可能會發生事件的channel連結在一起,當發生事件時,可以立即觸發相應連結的Handler。
channel.register(selector, SelectionKey.OP_READ,new MultiDispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
多線程Reactor總結
在多線程Reactor模型中,添加了一個工作者線程池,并将非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至于因為一些耗時的業務邏輯而延遲對後面I/O請求的處理。
多Reactor多線程模式(主從多Reactor模型)
在多線程單Reactor模型中,我們發現所有的I/O操作是由一個Reactor來完成,而Reactor運作在單個線程中,它需要處理包括
Accept()
/
read()
write
connect
操作,對于小容量的場景,影響不大。但是對于高負載、大并發或大資料量的應用場景時,容易成為瓶頸,主要原因如下:
- 一個NIO線程同時處理成百上千的鍊路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送;
- 當NIO線程負載過重之後,處理速度将變慢,這會導緻大量用戶端連接配接逾時,逾時之後往往會進行重發,這更加重了NIO線程的負載,最終會導緻大量消息積壓和處理逾時,成為系統的性能瓶頸;
是以,我們還可以更進一步優化,引入多Reactor多線程模式,如圖4-8所示,Main Reactor負責接收用戶端的連接配接請求,然後把接收到的請求傳遞給SubReactor(其中subReactor可以有多個),具體的業務IO處理由SubReactor完成。
Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用這種多線程模型,雖然不同的項目實作細節略有差別,但總體來說模式是一緻的。
圖4-8
- Acceptor,請求接收者,在實踐時其職責類似伺服器,并不真正負責連接配接請求的建立,而隻将其請求委托 Main Reactor 線程池來實作,起到一個轉發的作用。
- Main Reactor,主 Reactor 線程組,主要負責連接配接事件,并将IO讀寫請求轉發到 SubReactor 線程池。
- Sub Reactor,Main Reactor 通常監聽用戶端連接配接後會将通道的讀寫轉發到 Sub Reactor 線程池中一個線程(負載均衡),負責資料的讀寫。在 NIO 中 通常注冊通道的讀(OP_READ)、寫事件(OP_WRITE)。
MultiplyReactor
public class MultiplyReactor {
public static void main(String[] args) throws IOException {
MultiplyReactor mr = new MultiplyReactor(8888);
mr.start();
}
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
// Reactor(Selector) 線程池,其中一個線程被 mainReactor 使用,剩餘線程都被 subReactor 使用
static Executor mainReactorExecutor = Executors.newFixedThreadPool(POOL_SIZE);
// 主 Reactor,接收連接配接,把 SocketChannel 注冊到從 Reactor 上
private Reactor mainReactor;
private int port;
public MultiplyReactor(int port) {
try {
this.port = port;
mainReactor = new Reactor();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 啟動主從 Reactor,初始化并注冊 Acceptor 到主 Reactor
*/
public void start() throws IOException {
new Acceptor(mainReactor.getSelector(), port); // 将 ServerSocketChannel 注冊到 mainReactor
mainReactorExecutor.execute(mainReactor); //使用線程池來處理main Reactor的連接配接請求
}
}
public class Reactor implements Runnable{
private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>();
private final Selector selector;
public Reactor() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector(){
return selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
AsyncHandler handler;
while ((handler = events.poll()) != null) {
handler.getChannel().configureBlocking(false);
SelectionKey sk=handler.getChannel().register(selector, SelectionKey.OP_READ);
sk.attach(handler);
handler.setSk(sk);
}
selector.select(); //阻塞
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
while(it.hasNext()){
SelectionKey key=it.next();
//擷取attach方法傳入的附加對象
Runnable runnable=(Runnable)key.attachment();
if(runnable!=null){
runnable.run();
}
it.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void register(AsyncHandler asyncHandler){
events.offer(asyncHandler);
selector.wakeup();
}
}
public class Acceptor implements Runnable{
final Selector sel;
final ServerSocketChannel serverSocket;
int handleNext = 0;
private final int POOL_SIZE=Runtime.getRuntime().availableProcessors();
private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE);
private Reactor[] subReactors=new Reactor[POOL_SIZE-1];
public Acceptor(Selector sel, int port) throws IOException {
this.sel = sel;
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 綁定端口
// 設定成非阻塞模式
serverSocket.configureBlocking(false);
// 注冊到 選擇器 并設定處理 socket 連接配接事件
serverSocket.register(sel, SelectionKey.OP_ACCEPT,this);
init();
System.out.println("mainReactor-" + "Acceptor: Listening on port: " + port);
}
public void init() throws IOException {
for (int i = 0; i < subReactors.length; i++) {
subReactors[i]=new Reactor();
subReactorExecutor.execute(subReactors[i]);
}
}
@Override
public synchronized void run() {
try {
// 接收連接配接,非阻塞模式下,沒有連接配接直接傳回 null
SocketChannel sc = serverSocket.accept();
if (sc != null) {
// 把提示發到界面
sc.write(ByteBuffer.wrap("Multiply Reactor Pattern Example\r\nreactor> ".getBytes()));
System.out.println(Thread.currentThread().getName()+":Main-Reactor-Acceptor: " + sc.socket().getLocalSocketAddress() +" 注冊到 subReactor-" + handleNext);
// 如何解決呢,直接調用 wakeup,有可能還沒有注冊成功又阻塞了。這是一個多線程同步的問題,可以借助隊列進行處理
Reactor subReactor = subReactors[handleNext];
subReactor.register(new AsyncHandler(sc));
if(++handleNext == subReactors.length) {
handleNext = 0;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
AsyncHandler
public class AsyncHandler implements Runnable{
private SocketChannel channel;
private SelectionKey sk;
ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
ByteBuffer outputBuffer=ByteBuffer.allocate(1024);
StringBuilder builder=new StringBuilder(); //存儲用戶端的完整消息
public AsyncHandler(SocketChannel channel){
this.channel=channel;
}
public SocketChannel getChannel() {
return channel;
}
public void setSk(SelectionKey sk) {
this.sk = sk;
}
@Override
public void run() {
try {
if (sk.isReadable()) {
read();
} else if (sk.isWritable()) {
write();
}
}catch (Exception e){
try {
this.sk.channel().close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
protected void read() throws IOException {
inputBuffer.clear();
int n=channel.read(inputBuffer);
if(inputBufferComplete(n)){
System.out.println(Thread.currentThread().getName()+":Server端收到用戶端的請求消息:"+builder.toString());
outputBuffer.put(builder.toString().getBytes(StandardCharsets.UTF_8));
this.sk.interestOps(SelectionKey.OP_WRITE); //更改服務的邏輯狀态以及處理的事件類型
}
}
private boolean inputBufferComplete(int bytes) throws EOFException {
if(bytes>0){
inputBuffer.flip(); //轉化成讀取模式
while(inputBuffer.hasRemaining()){ //判斷緩沖區中是否還有元素
byte ch=inputBuffer.get(); //得到輸入的字元
if(ch==3){ //表示Ctrl+c 關閉連接配接
throw new EOFException();
}else if(ch=='\r'||ch=='\n'){ //表示換行符
return true;
}else{
builder.append((char)ch); //拼接讀取到的資料
}
}
}else if(bytes==-1){
throw new EOFException(); //用戶端關閉了連接配接
}
return false;
}
private void write() throws IOException {
int written=-1;
outputBuffer.flip(); //轉化為讀模式,判斷是否有資料需要發送
if(outputBuffer.hasRemaining()){
written=channel.write(outputBuffer); //把資料寫回用戶端
}
outputBuffer.clear();
builder.delete(0,builder.length());
if(written<=0){ //表示用戶端沒有輸資訊
this.sk.channel().close();
}else{
channel.write(ByteBuffer.wrap("\r\nreactor>".getBytes()));
this.sk.interestOps(SelectionKey.OP_READ);
}
}
}
關注[跟着Mic學架構]公衆号,擷取更多精品原創