天天看點

高性能網絡通訊架構Netty預熱—NIO模型

作者:愛做夢的程式員
高性能網絡通訊架構Netty預熱—NIO模型

Netty是一款高性能的網絡通信架構,其應用也很廣泛,比如常用的消息隊列RocketMQ,RPC架構Dubbo在底層都有使用到Netty。在學習Netty之前,我們需要對IO模型要有一定的了解,其中最重要的就是NIO,是以今天打算先對NIO進行一些簡單的梳理。

IO模型

常見IO模型分為幾種:

  • BIO :Blocking IO, 即同步阻塞式IO。Client和Server的每建立一次連接配接,都會建立一個線程,在Client等待Server響應的期間,會處于阻塞狀态。
  • NIO :Non-Blocking IO,即同步非阻塞式IO。NIO是基于Reactor模式,面向緩沖區并結合通道的IO模型。用戶端發送的連接配接請求都會注冊到多路複用器上,多路複用器輪詢到連接配接有IO請求就進行處理。
  • AIO : Asynchronous IO,即異步非阻塞,采用了 Proactor 模式,特點是先由作業系統完成後才通知服務端程式啟動線程去處理,一般适用于連接配接數較多且連接配接時間較長的應用。

既然BIO和NIO都是以同步的方式工作的,那麼這裡就先拿BIO與NIO做個簡單的對比,比較兩者的差異具體在哪些地方。

BIO

上面提到了Client和Server的每建立一次連接配接,都會建立一個線程并且會發生阻塞,那麼我們就來簡單的驗證一下。驗證方式也比較簡單,在編輯器中建立一個ServerSocket作為服務端并給定一個端口号用于用戶端連接配接,使用telnet作為用戶端來連接配接服務端并實作消息發送和接收,通過代碼來分析BIO會在那些地方會阻塞。

java複制代碼public static void main(String[] args) throws IOException {

    // 1. 建立一個BIO服務端 端口号為9999
    ServerSocket serverSocket = new ServerSocket(9998);
    System.out.println("=====等待用戶端連接配接......");

    // 2. 等待用戶端連接配接 , 會阻塞
    Socket socket = serverSocket.accept();
    System.out.println("=====用戶端已連接配接......");

    // 3. 擷取用戶端發送的内容,如果用戶端沒有發送内容,也會阻塞
    System.out.println("=====等待用戶端發送資料......");
    InputStream inputStream = socket.getInputStream();

    while (true) {
        byte[] bytes = new byte[2048];
        int read = inputStream.read(bytes);
        if (read != -1) {
            System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
        } else {
            break;
        }
    }
    inputStream.close();
    socket.close();

}
           

簡單編碼完成後,啟動服務,如果在沒有用戶端連接配接的情況下,accept()方法會阻塞,直到有用戶端進行了連接配接。

高性能網絡通訊架構Netty預熱—NIO模型

現在可以打開cmd使用telnet指令來進行連接配接。連接配接成功後結合'Ctrl + ]'快捷鍵進去Telent Client,使用send指令發送資料内容。

yaml複制代碼telnet 127.0.0.1 9998
           
高性能網絡通訊架構Netty預熱—NIO模型

這裡可以看到,用戶端雖然連接配接成功了,但是在調用getInputStream()方法時,線程又被阻塞了,那麼進行Telnet Client來發送資料。

高性能網絡通訊架構Netty預熱—NIO模型

一切OK,服務端收到了消息。既然說了,BIO一次連接配接就是一個線程,那麼再發起一個用戶端連接配接,來看看main線程到底還能不能擷取到消息。

高性能網絡通訊架構Netty預熱—NIO模型

經過驗證,第二個連接配接發送的消息,控制台确實沒有收到,那就證明了一次連接配接就是一個線程。那有些人就會有疑問,既然控制台沒列印消息,那怎麼確定這條消息就被服務端接收了呢?這豈不是很簡單,加個線程池,搞成僞異步不就搞定了。那麼就基于上面的代碼,按照下圖流程方式來簡單的改造改造。

高性能網絡通訊架構Netty預熱—NIO模型
java複制代碼// 建立線程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 1. 建立一個BIO服務端 端口号為9998
ServerSocket serverSocket = new ServerSocket(9998);

while(true){
    // 2. 等待用戶端連接配接 , 會阻塞
    Socket socket = serverSocket.accept();
    executorService.execute(() -> {

        try {
            System.out.println(Thread.currentThread().getName() + " 用戶端已連接配接...");
            // 3. 擷取用戶端的ip資訊
            InetAddress address = socket.getInetAddress();

            System.out.println(Thread.currentThread().getName() + " " + address.getHostName() + " , " + address.getHostAddress());

            // 4. 擷取用戶端發送的内容,如果用戶端沒有發送内容,也會阻塞
            InputStream inputStream = socket.getInputStream();

            while (true) {
                byte[] bytes = new byte[2048];
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println((Thread.currentThread().getName() + " " + new String(bytes, 0, read)));
                } else {
                    break;
                }
            }
            inputStream.close();
        } catch (Exception e) {

        } finally {
            try {
                socket.close();
                System.out.println(" socket 關閉連接配接 ");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    });
}
           

經過一頓亂敲,代碼改造完成,運作一下看看效果,嘗試多個用戶端連接配接,結果符合預期。(一次連接配接一個線程是不是更明顯了)

高性能網絡通訊架構Netty預熱—NIO模型

NIO

經過對BIO的測試發現,BIO對并發支援不好,如果有大批量的用戶端連接配接的服務端,那麼服務端就會不斷的建立線程,直到撐爆伺服器。是以,在JDK1.4+版本,官方提供了另一種IO模型-NIO模型。NIO即非阻塞式IO,目前應用在很多架構或中間件的底層,它基于Reactor模式,面向緩沖區,不面向流。其核心元件有三種,分别為Buffer緩沖區,Channel通道和Selector選擇器。

當程式需要與服務端進行資料互動時,并不會向BIO那樣,直接發送到服務端,而是将資料發送到Buffer緩沖區,而Buffer緩沖區與Channel通道之間會進行資料互動,Selector會對應一個線程并且會根據事件驅動(Event)來選擇哪一個Channel進行處理。當某個Channel上某個請求的事件完全就緒的時候,選擇器Selector才會将該任務配置設定給服務端的一個或多個線程,其他情況伺服器的線程可以做其他事情。

高性能網絡通訊架構Netty預熱—NIO模型

Buffer

Buffer可以簡單的了解為一個數組,程式可以向Buffer中寫入或者讀取資料,資料存儲依賴于緩存,在NIO中主要應用在和通道之間進行資料互動。Buffer的實作子類有很多,比如IntBuffer,CharBuffer,DoubleBuffer等,不過使用經常使用的還是ByteBuffer。寫一個簡單的小demo,來展現一下Buffer如何進行存儲和讀取的。

java複制代碼public static void main(String[] args) {

    // 1. 建立一個容量為100的ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(100);
    // 2. 向Buffer中寫入資料
    buffer.put("byte".getBytes());
    buffer.put("byte02".getBytes());

    System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());
    // 3. 切換為讀模式
    buffer.flip();
    System.out.println("limit = "+buffer.limit()+ " , position = "+buffer.position());

    // 4. 判斷是否還有元素,有則讀取
    while(buffer.hasRemaining()){
        System.out.println(new String(new byte[]{buffer.get()}));
    }
}
           

需要注意的是,如果需要讀取緩沖區的資料時,一定要先調用flip()方法,這是因為在源碼中有三個重要參數

java複制代碼// 表示目前寫入或讀取的位置,每當寫入或讀取時,該值會進行+1操作
private int position = 0;  
// 緩沖區裡的資料的總數,代表了目前緩沖區中一共有多少資料
private int limit;
// 緩沖區能夠容納的資料元素的最大數量
private int capacity;
           

在建立緩沖區時,會指定capacity的大小,此時limit等于capacity,随着不斷的寫入資料position的值不斷的增加,如果position大于capacity時,則會抛出異常。在不調用flip()方法,進行讀取資料時,源碼中會根據目前的position位置繼續向下讀,那麼讀出的資料就會是一個空值。

java複制代碼/**
* 讀取資料時,擷取索引下标
*/
final int nextGetIndex() {  // package-private
    int p = position;
    if (p >= limit)
        throw new BufferUnderflowException();
    position = p + 1;
    return p;
}
           

flip方法中所做的事情就是将position指派給limit,并将自身清零。那麼在讀取資料時,position就會從0開始讀,一直讀到limit為止

高性能網絡通訊架構Netty預熱—NIO模型

是以demo程式的輸出結果為

java複制代碼limit = 100 , position = 10
limit = 10 , position = 0
           

緩沖區的資料是存儲在記憶體中的,這個記憶體可以是JVM的堆記憶體,也可以是堆外的記憶體(堆外記憶體)。堆外記憶體的方式可以通過allcateDirect方法進行建立,傳回的是DirectByteBuffer對象(直接緩沖區),不受GC影響,使用的是作業系統的實體記憶體,适合大檔案傳輸等操作。堆記憶體的方式可以通過allocate方法建立,傳回的是HeapByteBuffer對象(非直接緩沖區),會受GC影響。

Channel

Channel是源程式和目标程式之間資料傳輸的通道,可以通過這個通道進行資料讀取或寫入,當然資料的讀取和寫入需要配合Buffer來一起完成。與普通的流相比,Channel是一個雙向的通道,而流隻能進行單向傳輸。在NIO中,Channel的實作分為四種,分别為FileChannel, SocketChannel, ServerSocketChannel, DatagramChannel。常用的方法有read(Buffer buffer),write(Buffer buffer),transferFrom(Channel channel,long position,long count)分别表示将Channel中的資料讀取到Buffer中,将Buffer中的資料寫入到Channel中以及從通道中拷貝資料。

高性能網絡通訊架構Netty預熱—NIO模型
  • FileChannel

在實作類中FileChannel常被使用,FileChannel即檔案通道,用于檔案讀取,其主要的實作類是FileChannelImpl,但是在使用的過程中是無法直接通過new來建立,可以通過輸入流InputStream,輸出流OutputStream,RandomAccessFile或者FileChannel提供的open()方法中來擷取執行個體,那麼就通過FileChannel來寫個檔案拷貝的例子。

java複制代碼public static void main(String[] args) {
    // 1. 需要複制的檔案
    File file = new File("file-channel.txt");
    // 2. 建立輸入流
    FileInputStream inputStream = new FileInputStream(file);
    FileChannel inputStreamChannel = inputStream.getChannel();

    // 3. 建立輸出流
    FileOutputStream outputStream = new FileOutputStream("file-channel-copy.txt");
    FileChannel outputStreamChannel = outputStream.getChannel();

    // 4. 建立buffer緩沖
    ByteBuffer buffer = ByteBuffer.allocate((int) file.length());

    // 5. 将通道資料讀取到緩沖區
    inputStreamChannel.read(buffer);

    buffer.flip();

    // 6. 将緩沖區寫入到通道
    outputStreamChannel.write(buffer);

    /**
    * 除了上面使用的read和write方法,也可以使用transferFrom方法直接copy通道中的資料
    * outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
    */

    outputStreamChannel.close();
    inputStreamChannel.close();
}
           
  • ServerSocketChannel

ServerSocketChannel與BIO中的ServerSocket類似,可以綁定端口并監聽TCP連接配接。在等待用戶端連接配接的過程中,可以通過使用configureBlocking()方法來設定阻塞或非阻塞,如果設定了非阻塞,那麼在調用accept()方法時可能會出現NULL值,是以需要注意一下。那麼同樣搞個demo,來實作雙端通信的效果。

java複制代碼public static void main(String[] args) {
    // 1. 打開通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open():
    // 2.綁定9999端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 3.設定非阻塞
    serverSocketChannel.configureBlocking(false) ;
    while (true) {
    // 4.用戶端連接配接,因為設定了非阻塞,是以這裡可能會為空
    SocketChannel channel = serverSocketChannel.accept():
    if (channel == null) {
        System.out.println("沒有用戶端連接配接......");
        continue:
    }
    //5.建立緩沖區
    ByteBuffer buffer = ByteBuffer.allocate(1024) ;
    // 6.将用戶端發來的消息,讀取到緩沖區
    int read = channel.read(buffer) ;
    System.out.println(" client message : " + new String(buffer.array() ,  0, read, StandardCharsets.UTF_8):
    // 7.回複消息給用戶端
    channel.write(ByteBuffer.wrap(" server received message".getBytes(StandardCharsets.UTF_8)));
    channel.close();
    break;
    }
}
           
  • SocketChannel

上面服務端代碼已經搞定,現在可以編寫用戶端代碼。用戶端使用SocketChannel去連接配接服務端,類似BIO中的Socket。一頓亂敲後,先啟動Server,在啟動Client就可以實作雙端通信了。

java複制代碼public static void main(String[] args) throws IOException{
    // 1. 建立通道
    SocketChannel channel = SocketChannel.open();
    // 2. 通過ip和端口連接配接server
    channel.connect(new InetSocketAddress("127.0.0.1",9999));
    // 3. 像server發送資料
    channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
    // 4. 建立buffer 用于接收server消息
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 5. 将server消息讀入buffer
    int read = channel.read(buffer);
    System.out.println(" server message : " + new String(buffer.array() , 0,read, StandardCharsets.UTF_8));
    channel.close();
}
           

Selector

Selector是NIO中的選擇器,主要工作就是通道注冊,事件監聽,事件選擇切換,一個選擇器可以注冊多個通道。 ServerSocketChannel和SocketChannel都可以注冊到選擇器中,選擇器中通過調用select方法擷取通道中所發生的事件,并且根據不同的事件切換到不同的通道。選擇器的事件有四種,分别為OP_READ,OP_WRITE,OP_ACCEPT以及OP_CONNECT。

高性能網絡通訊架構Netty預熱—NIO模型

NIO中,一般是一個單線程處理一個選擇器,一個選擇器可以監控很多通道。是以,通過選擇器,一個單線程可以處理上百個、上千個甚至更多的通道,這樣可以減少線程之間上下文的切換。通道和選擇器之間通過使用register()方法進行注冊,通過 selectedKeys()方法擷取通道發生的事件。那先現在就來改造一下上面的代碼,通過注冊選擇器的方式實作雙端通信。

  • 服務端
java複制代碼public static void main(String[] args) throws Exception {

    // 1. 建立ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 2. 建立選擇器Selector
    Selector selector = Selector.open();
    // 3. 綁定9999端口
    serverSocketChannel.bind(new InetSocketAddress(9999));
    // 4. 設定非阻塞
    serverSocketChannel.configureBlocking(false);
    // 5. ServerSocketChannel注冊到選擇器,并監聽連接配接事件
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // 6. 沒有監聽到任何事件
        if (selector.selectNow() == 0) {
            continue;
        }
        // 7. 監聽到事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
        while (keyIterator.hasNext()) {
            SelectionKey key = keyIterator.next();
            // 8. 如果是用戶端連接配接事件
            if (key.isAcceptable()) {
                // 8.1 建立SocketChannel,注冊到選擇器并監聽讀事件
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                serverSendMsg(socketChannel, ByteBuffer.wrap("hello client".getBytes(StandardCharsets.UTF_8)));
            } else if (key.isReadable()) { // 9. 如果用戶端是讀事件
                // 9.1 擷取事件通道,并讀取通道資料
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                int read = socketChannel.read(byteBuffer);
                System.out.println("[ client message ] : " + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
                serverSendMsg(socketChannel, ByteBuffer.wrap("receive client message".getBytes(StandardCharsets.UTF_8)));
            }
            keyIterator.remove();
        }
    }
}
/** 
* 發送消息給用戶端
*/
public static void serverSendMsg(SocketChannel socketChannel, ByteBuffer byteBuffer) throws Exception {
    socketChannel.write(byteBuffer);
}
           
  • 用戶端
java複制代碼public static void main(String[] args) throws Exception {
    SocketChannel channel = SocketChannel.open();
    channel.configureBlocking(false);
    boolean connect = channel.connect(new InetSocketAddress("127.0.0.1", 9999));
    if (!connect) {
        while (!channel.finishConnect()) {
            System.out.println("服務連接配接中.....");
        }
    }
    receiveServerMsg(channel);
    // 發送消息給服務端
    channel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
    receiveServerMsg(channel);
    new CountDownLatch(1).await();

}
/**
* 接收服務響應的消息
*/
public static void receiveServerMsg(SocketChannel channel) throws Exception {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = channel.read(buffer);
    System.out.println("[ server message ]: " + new String(buffer.array(), 0, read, StandardCharsets.UTF_8));
}
           
  • 運作結果

總結

以上部分簡單的介紹了BIO與NIO,其中着重描述了NIO的特性(為了後續的Netty)。BIO基于位元組流和字元流進行操作的,而NIO基于Channel(通道)和Buffer(緩沖區)進行操作的,資料總是從通道讀取到緩沖區中,或者從緩沖區寫入到通道中。Selector(選擇器)用于監聽多個通道事件,是以使用單個線程就可以監聽多個用戶端通道。下面表格是兩種IO方式式的對比。

阻塞狀态 實作方式程度 效率 資料處理
BIO 阻塞 簡單 面向流
NIO 非阻塞 相對複雜 面向緩沖區
原文連結:https://juejin.cn/post/7245567741691445285

繼續閱讀