天天看點

Java NIO——Selector選擇器

一、簡介

  • 1.1、Java 的 NIO,用非阻塞的 IO 方式。可以用一個線程,處理多個的用戶端連接配接,就會使用到Selector(選擇器)
  • 1.2、Selector 能夠檢測多個注冊的通道上是否有事件發生(注意:多個Channel以事件的方式可以注冊到同一個Selector),如果有事件發生,便擷取事件然後針對每個事件進行相應的處理。這樣就可以隻用一個單線程去管理多個通道,也就是管理多個連接配接和請求。
  • 1.3、隻有在 連接配接/通道 真正有讀寫事件發生時,才會進行讀寫,就大大地減少了系統開銷,并且不必為每個連接配接都建立一個線程,不用去維護多個線程。
  • 1.4、避免了多線程之間的上下文切換導緻的開銷。
Java NIO——Selector選擇器

Selector一般稱為選擇器 ,也稱多路複用器,多條channel複用selector。channe通過注冊到selector ,使selector對channel進行監聽,實作盡可能少的線程管理多個連接配接。減少了 線程的使用,降低了因為線程的切換引起的不必要額資源浪費和多餘的開銷。

也是網絡傳輸非堵塞的核心元件。

二、特點

  • 2.1、Netty 的 IO 線程 NioEventLoop 聚合了 Selector(選擇器,也叫多路複用器),可以同時并發處理成百上千個用戶端連接配接。
  • 2.2、當線程從某用戶端 Socket 通道進行讀寫資料時,若沒有資料可用時,該線程可以進行其他任務。
  • 2.3、線程通常将非阻塞 IO 的空閑時間用于在其他通道上執行 IO 操作,是以單獨的線程可以管理多個輸入和輸出通道。
  • 2.4、由于讀寫操作都是非阻塞的,這就可以充分提升 IO 線程的運作效率,避免由于頻繁 I/O 阻塞導緻的線程挂起。
  • 2.5、一個 I/O 線程可以并發處理 N 個用戶端連接配接和讀寫操作,這從根本上解決了傳統同步阻塞 I/O 一連接配接一線程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升。

三、Selector的作用

選擇器提供選擇執行已經就緒的任務的能力。從底層來看,Selector提供了詢問通道是否已經準備好執行每個I/O操作的能力。Selector 允許單線程處理多個Channel。僅用單個線程來處理多個Channels的好處是,隻需要更少的線程來處理通道。事實上,可以隻用一個線程處理所有的通道,這樣會大量的減少線程之間上下文切換的開銷。

四、Selector類相關方法

Selector 類是一個抽象類, 常用方法和說明如下:

public abstract class Selector implements Closeable {

	public static Selector open();//得到一個選擇器對象
	
	public abstract int select(long timeout);//監控所有注冊的通道,當其中有 IO 操作可以進行時,将
											 //對應的 SelectionKey 加入到内部集合中并傳回,參數用來設定逾時時間
	public abstract Set<SelectionKey> selectedKeys();//從内部集合中得到所有的 SelectionKey
}
           

注意事項:

  • 1、NIO中的 ServerSocketChannel功能類似ServerSocket,SocketChannel功能類似Socket。
  • 2、selector select()方法詳解

    select()方法,可以查詢出已經就緒的通道操作,這些就緒的狀态集合,包存在一個元素是SelectionKey對象的Set集合中。

    • selector.select()//阻塞
    • selector.select(1000);//阻塞1000毫秒,在1000毫秒後傳回
    • selector.selectNow();//不阻塞,立馬返還
    • selector.wakeup();//喚醒selector

select()方法傳回的int值,表示有多少通道已經就緒,更準确的說,是自前一次select方法以來到這一次select方法之間的時間段上,有多少通道變成就緒狀态。

NIO 非阻塞 網絡程式設計原理分析圖

NIO 非阻塞 網絡程式設計相關的(Selector、SelectionKey、ServerScoketChannel和SocketChannel) 關系梳理圖

Java NIO——Selector選擇器
  • 1、當用戶端連接配接時,會通過ServerSocketChannel 得到 SocketChannel。
  • 2、Selector 進行監聽 select 方法, 傳回有事件發生的通道的個數。
  • 3、将socketChannel注冊到Selector上,register(Selector sel,int ops),一個selector上可以注冊多個SocketChannel。
  • 4、注冊後傳回一個 SelectionKey, 會和該Selector 關聯(集合)。
  • 5、進一步得到各個 SelectionKey (有事件發生)。
  • 6、在通過 SelectionKey 反向擷取 SocketChannel,方法 channel()。
  • 7、可以通過 得到的 channel,完成業務處理。

五、可選擇通道(SelectableChannel)

并不是所有的Channel,都是可以被Selector 複用的。比方說,FileChannel就不能被選擇器複用。

判斷一個Channel 能被Selector 複用,有一個前提:判斷他是否繼承了一個抽象類SelectableChannel。如果繼承了SelectableChannel,則可以被複用,否則不能。

SelectableChannel類提供了實作通道的可選擇性所需要的公共方法。它是所有支援就緒檢查的通道類的父類。所有socket通道,都繼承了SelectableChannel類都是可選擇的,包括從管道(Pipe)對象的中獲得的通道。而FileChannel類,沒有繼承SelectableChannel,是以是不是可選通道。

通道和選擇器注冊之後,他們是綁定的關系嗎?

不是一對一的關系。一個通道可以被注冊到多個選擇器上,但對每個選擇器而言隻能被注冊一次。

通道和選擇器之間的關系,使用注冊的方式完成。SelectableChannel可以被注冊到Selector對象上,在注冊的時候,需要指定通道的哪些操作,是Selector感興趣的。

Channel注冊到Selector

使用Channel.register(Selector sel,int ops)方法,将一個通道注冊到一個選擇器時。第一個參數,指定通道要注冊的選擇器是誰。第二個參數指定選擇器需要查詢的通道操作。

可以供選擇器查詢的通道操作,從類型來分,包括以下四種:

  • 1、可讀 : SelectionKey.OP_READ
  • 2、可寫 : SelectionKey.OP_WRITE
  • 3、連接配接 : SelectionKey.OP_CONNECT
  • 4、接收 : SelectionKey.OP_ACCEPT

如果Selector對通道的多操作類型感興趣,可以用“位或”操作符來實作:int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;

注意,操作一詞,是一個是使用非常泛濫,也是一個容易混淆的詞。特别提醒的是,選擇器查詢的不是通道的操作,而是通道的某個操作的一種就緒狀态。

一旦通道具備完成某個操作的條件,表示該通道的某個操作已經就緒,就可以被Selector查詢到,程式可以對通道進行對應的操作。比方說,某個SocketChannel通道可以連接配接到一個伺服器,則處于“連接配接就緒”(OP_CONNECT)。再比方說,一個ServerSocketChannel伺服器通道準備好接收新進入的連接配接,則處于“接收就緒”(OP_ACCEPT)狀态。還比方說,一個有資料可讀的通道,可以說是“讀就緒”(OP_READ)。一個等待寫資料的通道可以說是“寫就緒”(OP_WRITE)。

六、選擇鍵(SelectionKey)

Channel和Selector的關系确定好後,并且一旦通道處于某種就緒的狀态,就可以被選擇器查詢到。這個工作,使用選擇器Selector的select()方法完成。select方法的作用,對感興趣的通道操作,進行就緒狀态的查詢。

Selector可以不斷的查詢Channel中發生的操作的就緒狀态。并且挑選感興趣的操作就緒狀态。一旦通道有操作的就緒狀态達成,并且是Selector感興趣的操作,就會被Selector選中,放入選擇鍵集合中。

一個選擇鍵,首先是包含了注冊在Selector的通道操作的類型,比方說SelectionKey.OP_READ。也包含了特定的通道與特定的選擇器之間的注冊關系。

開發應用程式是,選擇鍵是程式設計的關鍵。NIO的程式設計,就是根據對應的選擇鍵,進行不同的業務邏輯處理。

選擇鍵的概念,有點兒像事件的概念。

一個選擇鍵有點兒像監聽器模式裡邊的一個事件,但是又不是。由于Selector不是事件觸發的模式,而是主動去查詢的模式,是以不叫事件Event,而是叫SelectionKey選擇鍵。

七、Selector的使用流程

7.1、建立Selector

Selector對象是通過調用靜态工廠方法open()來執行個體化的,如下:

// 1、擷取Selector選擇器
Selector selector = Selector.open();
           

Selector的類方法open()内部是向SPI送出請求,通過預設的SelectorProvider對象擷取一個新的執行個體。

7.2、将Channel注冊到Selector

要實作Selector管理Channel,需要将channel注冊到相應的Selector上,如下:

// 2、擷取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 3.設定為非阻塞
serverSocketChannel.configureBlocking(false);

// 4、綁定連接配接
serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));

// 5、将通道注冊到選擇器上,并制定監聽事件為:“接收”事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
           

上面通過調用通道的register()方法會将它注冊到一個選擇器上。

首先需要注意的是:

  • 與Selector一起使用時,Channel必須處于非阻塞模式下,否則将抛出異常IllegalBlockingModeException。這意味着,FileChannel不能與Selector一起使用,因為FileChannel不能切換到非阻塞模式,而套接字相關的所有的通道都可以。

另外,還需要注意的是:

  • 一個通道,并沒有一定要支援所有的四種操作。比如伺服器通道ServerSocketChannel支援Accept 接受操作,而SocketChannel用戶端通道則不支援。可以通過通道上的validOps()方法,來擷取特定通道下所有支援的操作集合。

7.3、輪詢查詢就緒操作

通過Selector的select()方法,可以查詢出已經就緒的通道操作,這些就緒的狀态集合,包存在一個元素是SelectionKey對象的Set集合中。

下面是Selector幾個重載的查詢select()方法:

  • 1、select():阻塞到至少有一個通道在你注冊的事件上就緒了。
  • 2、select(long timeout):和select()一樣,但最長阻塞事件為timeout毫秒。
  • 3、selectNow():非阻塞,隻要有通道就緒就立刻傳回。

select()方法傳回的int值,表示有多少通道已經就緒,更準确的說,是自前一次select方法以來到這一次select方法之間的時間段上,有多少通道變成就緒狀态。

一旦調用select()方法,并且傳回值不為0時,通過調用Selector的selectedKeys()方法來通路已選擇鍵集合,然後疊代集合的每一個選擇鍵元素,根據就緒操作的類型,完成對應的操作:

Set selectedKeys = selector.selectedKeys();

Iterator keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {

        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {

        // a connection was established with a remote server.

    } else if (key.isReadable()) {

        // a channel is ready for reading

    } else if (key.isWritable()) {

        // a channel is ready for writing

    }

    keyIterator.remove();

}
           

處理完成後,直接将選擇鍵,從這個集合中移除,防止下一次循環的時候,被重複的處理。鍵可以但不能添加。試圖向已選擇的鍵的集合中添加元素将抛出java.lang.UnsupportedOperationException。

八、一個NIO 程式設計的簡單執行個體

8.1、服務端:

/**
 * @Description: 服務端接收用戶端傳來的資料
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        //建立ServerSocketChannel -> ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //得到一個Selecor對象
        Selector selector = Selector.open();

        //綁定一個端口6666, 在伺服器端監聽
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));

        //設定為非阻塞
        serverSocketChannel.configureBlocking(false);

        //把 serverSocketChannel 注冊到  selector 關心 事件為 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 用輪詢的方式,查詢擷取“準備就緒”的注冊過的操作
        while (true){
            //這裡我們等待1秒,如果沒有事件發生, 傳回
            if(selector.select(1000) == 0) { //沒有事件發生
                System.out.println("伺服器等待了1秒,無連接配接");
                continue;
            }

            //如果傳回的>0, 就擷取到相關的 selectionKey集合
            //1.如果傳回的>0, 表示已經擷取到關注的事件
            //2. selector.selectedKeys() 傳回關注事件的集合
            // 通過 selectionKeys 反向擷取通道
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            //周遊 Set<SelectionKey>, 使用疊代器周遊
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while(iterator.hasNext()){
                //擷取到SelectionKey
                SelectionKey key = iterator.next();
				
				//處理key時,需要從selectionKeys集合中删除,否則下次處理就會有問題
				iterator.remove();
				
                //根據key 對應的通道發生的事件做相應處理
                if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的用戶端連接配接
                    //該該用戶端生成一個 SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("用戶端連接配接成功 生成了一個 socketChannel " + socketChannel.hashCode());
                    //将  SocketChannel 設定為非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketChannel 注冊到selector, 關注事件為 OP_READ, 同時給socketChannel
                    //關聯一個Buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                    System.out.println("用戶端連接配接後 ,注冊的selectionkey 數量=" + selector.keys().size()); //2,3,4..
                }
                if(key.isReadable()) {  //發生 OP_READ
                    try {
                        //通過key 反向擷取到對應channel
                        SocketChannel channel = (SocketChannel)key.channel();

                        //擷取到該channel關聯的buffer
                        ByteBuffer buffer = (ByteBuffer)key.attachment();
                        //如果是正常斷開,read方法傳回值是-1
                        int read = channel.read(buffer);
                        if(read == -1){
                            key.cancel();
                        }else {
                            System.out.println("form 用戶端 " + new String(buffer.array()));
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        //因為用戶端斷開了,是以需要将key取消(從selector的keys集合中真正删除key)
                        key.cancel();
                    }
                }
            }
        }
    }
}
           

8.2、用戶端:

public class NIOClient2 {

    public static void main(String[] args) throws IOException {
        //得到一個網絡通道
        SocketChannel socketChannel = SocketChannel.open();

        //設定非阻塞
        socketChannel.configureBlocking(false);
        //提供伺服器端的ip 和 端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("217.0.0.1", 6666);

        //連接配接伺服器
        //socketChannel.connect(inetSocketAddress);
        //連接配接伺服器
        if (!socketChannel.connect(inetSocketAddress)) {

            while (!socketChannel.finishConnect()) {
                System.out.println("因為連接配接需要時間,用戶端不會阻塞,可以做其它工作..");
            }
        }

        //...如果連接配接成功,就發送資料
        String str = "hello, world";
        //Wraps a byte array into a buffer
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        //發送資料,将 buffer 資料寫入 channel
        socketChannel.write(buffer);
        //完畢時,清除緩沖區内容
        buffer.clear();

        //關閉相關流
        socketChannel.close();
    }
}
           

九、SelectionKey

9.1、SelectionKey,表示 Selector 和網絡通道的注冊關系, 共四種:

  • int OP_ACCEPT:有新的網絡連接配接可以 accept,值為 16
  • int OP_CONNECT:代表連接配接已經建立,值為 8
  • int OP_READ:代表讀操作,值為 1
  • int OP_WRITE:代表寫操作,值為 4

源碼中:

public static final int OP_READ = 1 << 0; 
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
           

9.2、SelectionKey相關方法

public abstract class SelectionKey {

	public abstract Selector selector();//得到與之關聯的 Selector 對象
   
	public abstract SelectableChannel channel();//得到與之關聯的通道

	public final Object attachment();//得到與之關聯的共享資料

	public abstract SelectionKey interestOps(int ops);//設定或改變監聽事件

	public final boolean isAcceptable();//是否可以 accept

	public final boolean isReadable();//是否可以讀

	public final boolean isWritable();//是否可以寫

}
           

十、ServerSocketChannel

10.1、ServerSocketChannel 在伺服器端監聽新的用戶端 Socket 連接配接

10.2、相關方法如下

public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{

	public static ServerSocketChannel open();得到一個 ServerSocketChannel 通道
	
	public final ServerSocketChannel bind(SocketAddress local);設定伺服器端端口号
	
	public final SelectableChannel configureBlocking(boolean block);設定阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
	
	public SocketChannel accept();接受一個連接配接,傳回代表這個連接配接的通道對象
	
	public final SelectionKey register(Selector sel, int ops);注冊一個選擇器并設定監聽事件
	
}
           

十一、SocketChannel

  • 10.1、SocketChannel,網絡 IO 通道,具體負責進行讀寫操作。NIO 把緩沖區的資料寫入通道,或者把通道裡的資料讀到緩沖區。
  • 10.2、相關方法如下
public abstract class SocketChannel extends AbstractSelectableChannel 
	implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
	
	public static SocketChannel open();//得到一個 SocketChannel 通道
	
	public final SelectableChannel configureBlocking(boolean block);//設定阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
	
	public boolean connect(SocketAddress remote);//連接配接伺服器
	
	public boolean finishConnect();//如果上面的方法連接配接失敗,接下來就要通過該方法完成連接配接操作
	
	public int write(ByteBuffer src);//往通道裡寫資料
	
	public int read(ByteBuffer dst);//從通道裡讀資料
	
	public final SelectionKey register(Selector sel, int ops, Object att);//注冊一個選擇器并設定監聽事件,最後一個參數可以設定共享資料
	
	public final void close();//關閉通道
	
}
           

NIO程式設計小結

NIO程式設計的難度比同步阻塞BIO大很多。

請注意以上的代碼中并沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼将會更加複雜。

  • 1、用戶端發起的連接配接操作是異步的,可以通過在多路複用器注冊OP_CONNECT等待後續結果,不需要像之前的用戶端那樣被同步阻塞。
  • 2、SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的資料它不會同步等待,直接傳回,這樣I/O通信線程就可以處理其他的鍊路,不需要同步等待這個鍊路可用。
  • 3、線程模型的優化:由于JDK的Selector在Linux等主流作業系統上通過epoll實作,它沒有連接配接句柄數的限制(隻受限于作業系統的最大句柄數或者對單個程序的句柄限制),這意味着一個Selector線程可以同時處理成千上萬個用戶端連接配接,而且性能不會随着用戶端的增加而線性下降。是以,它非常适合做高性能、高負載的網絡伺服器。

參考:

https://www.cnblogs.com/crazymakercircle/p/9826906.html