天天看點

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

由于 Netty 的底層是 NIO,是以在讀 Netty 源碼之前,首先了解一下 NIO 網絡程式設計相關知識。

直接看官方的注釋,因為注釋是最權威的。

1. NIO 簡介

NIO,New IO(官方),Non-blocking IO(非官方),是 JDK1.4 中引入的一種新的 IO 标準,是一種同步非阻塞 IO。NIO 是以

為機關進行資料處理的,當然塊的大小是程式員自己指定的。其相對于 BIO 的以位元組/字元為機關所進行的阻塞式處理方式,大大提高了讀寫效率與并發度。

  • BIO:Blocking IO,同步阻塞 IO
  • NIO:Non-blocking IO,同步非阻塞 IO JDK1.4
  • AIO:異步非阻塞 IO,也稱為 NIO2.0 JDK1.7
  • BIO:一個用戶端對應一個Channel,一個Channel由一個專門線程負責處理,Channel和線程是1:1關系
  • NIO:一個用戶端對應一個Channel,一個線程可以處理多個Channel,即Channel和線程關系是n:1關系

    怎麼實作?由Selector(多路複用器)實作,多個Channel注冊到Selector上,一個Selector和線程是一對一關系,線程處理哪個Channel的使用者請求,由Selector決定,即哪個Channel準備就緒了,就讓線程處理哪個Channel的請求

SelectorProvider抽象類簡介

先看下類上面的注釋:

/**
 * Service-provider class for selectors and selectable channels.
 * 用于選擇器和可選擇通道的服務提供程式類。
 * 
 * 解釋:就是說SelectorProvider這個類專門用來擷取selectors和selectable channels的
 * 
 * <p> A selector provider is a concrete subclass of this class that has a
 * zero-argument constructor and implements the abstract methods specified
 * below.  A given invocation of the Java virtual machine maintains a single
 * system-wide default provider instance, which is returned by the {@link
 * #provider() provider} method.  The first invocation of that method will locate
 * the default provider as specified below.
 * 
 * 選擇器提供程式是這個類的一個具體子類,它有一個零參數的構造函數,并實作下面指定的抽象方法。
 * Java虛拟機的給定調用維護一個單例的系統範圍的預設Provider執行個體,該執行個體
 * 由{@link #provider() provider}方法傳回。該方法的第一次調用定位到下面指定的預設provider。
 * 簡單來說就是provider方法傳回的就是該類的一個執行個體,且是系統全局範圍的,單例的,
 * 下面一段話介紹了該執行個體會在哪些地方使用:
 * 
 * 解釋:就是說SelectorProvider.provider方法可以傳回一個預設執行個體,這個執行個體
 * 在虛拟機内是全局、單例的
 * 
 * <p> The system-wide default provider is used by the static <tt>open</tt>
 * methods of the {@link java.nio.channels.DatagramChannel#open
 * DatagramChannel}, {@link java.nio.channels.Pipe#open Pipe}, {@link
 * java.nio.channels.Selector#open Selector}, {@link
 * java.nio.channels.ServerSocketChannel#open ServerSocketChannel}, and {@link
 * java.nio.channels.SocketChannel#open SocketChannel} classes.  It is also
 * used by the {@link java.lang.System#inheritedChannel System.inheritedChannel()}
 * method. A program may make use of a provider other than the default provider
 * by instantiating that provider and then directly invoking the <tt>open</tt>
 * methods defined in this class.
 * 
 * 整個系統預設的provider執行個體會被{@link java.nio.channels.DatagramChannel#open
 * DatagramChannel}, {@link java.nio.channels.Pipe#open Pipe}, {@link
 * java.nio.channels.Selector#open Selector}, {@link
 * java.nio.channels.ServerSocketChannel#open ServerSocketChannel}, 和{@link
 * java.nio.channels.SocketChannel#open SocketChannel}這些類的靜态open方法使用。
 * 它也被{@link java.lang.System#inheritedChannel System.inheritedChannel()}方法使用
 * 程式除了使用預設的provider外,還可以使用通過執行個體化provider,然後直接調用該類中定義的open方法
 * 
 * 解釋:上面的provider方法擷取到的預設SelectorProvider執行個體會被上面描述的衆多類的方法中被使用
 * 也可以直接通過SelectorProvider的方法擷取到Selector和selectable channels
 * 
 * <p> All of the methods in this class are safe for use by multiple concurrent
 * threads.  </p>
 * 這個類中的所有方法對于多個并發線程都是安全的。
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 */

public abstract class SelectorProvider {
	...
}
           

我們可以看下上面描述的各種使用全局預設SelectorProvider執行個體的場景:

  • DatagramChannel
    public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel{
    	...
        public static DatagramChannel open() throws IOException {
            return SelectorProvider.provider().openDatagramChannel();
        }
    }
               
    看到該方法傳回的DatagramChannel繼承AbstractSelectableChannel ,就是SelectableChannel
    public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel{
    	...
    }
    
    public abstract class AbstractSelectableChannel extends SelectableChannel{
    	...
    }
               
  • Pipe的open靜态方法也是一樣的:
    public abstract class Pipe {
    	...
        public static Pipe open() throws IOException {
            return SelectorProvider.provider().openPipe();
        }
        //雖然傳回Pipe,自己,但是Pipe中包含了一個内部類SourceChannel,也是SelectableChannel
        public static abstract class SourceChannel
            extends AbstractSelectableChannel
            implements ReadableByteChannel, ScatteringByteChannel{
            ...
    	}
    }
               
  • Selector的open同理:
    public abstract class Selector implements Closeable {
    	...
        public static Selector open() throws IOException {
            return SelectorProvider.provider().openSelector();
        }
    }
               
  • ServerSocketChannel同理:
    //ServerSocketChannel就是SelectableChannel
    public abstract class ServerSocketChannel
        extends AbstractSelectableChannel
        implements NetworkChannel{
        ...
        public static ServerSocketChannel open() throws IOException {
            return SelectorProvider.provider().openServerSocketChannel();
        }
    }
               
  • SocketChannel同理:
    public abstract class SocketChannel
        extends AbstractSelectableChannel
        implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
        ...
        public static SocketChannel open() throws IOException {
            return SelectorProvider.provider().openSocketChannel();
        }
    }
               
  • System.inheritedChannel:
    public final class System {
    	...
        public static Channel inheritedChannel() throws IOException {
            return SelectorProvider.provider().inheritedChannel();
        }
    }
               

Selector抽象類簡介

先看下類上面注釋,這個類的注釋很長,我都把中文和解釋寫在裡面了,這個類個人認為就是Nio的核心,也是多路複用的核心,注釋中把整個Nio執行流程講解的非常詳細,看完基本可以把Nio的執行流程弄的非常明白,建議耐心看完:

注釋中分了三大塊(注釋中用華麗的分割線分隔了~):

  • Selector中的三個鍵集
  • Selector的選擇流程
  • Selector的并發性
/**
 * A multiplexor of {@link SelectableChannel} objects.
 * {@link SelectableChannel}對象的多路複用器。
 * 
 * <p> A selector may be created by invoking the {@link #open open} method of
 * this class, which will use the system's default {@link
 * java.nio.channels.spi.SelectorProvider selector provider} to
 * create a new selector.  A selector may also be created by invoking the
 * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}
 * method of a custom selector provider.  A selector remains open until it is
 * closed via its {@link #close close} method.
 * 可以通過調用本類的{@link #open open}方法來建立selector,該方法将使用系統的
 * 預設{@link java.nio.channels.spi.SelectorProvider selector provider}來
 * 建立新的selector。
 * 
 * 還可以通過調用自定義的selector provider的
 * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}方法來
 * 建立selector。
 * 
 * 選擇器一直保持打開狀态,直到通過其{@link #close close}方法關閉。
 * 
 * <a name="ks"></a>
 *
 * <p> A selectable channel's registration with a selector is represented by a
 * {@link SelectionKey} object.  A selector maintains three sets of selection
 * keys:
 * 一個可選擇的通道在selector上的注冊是由一個{@link SelectionKey}對象表示的。
 * 一個selector維護三組SelectionKey的集合:
 * 
 * 解釋:不管一個通道在selector上注冊監聽幾個事件就隻會有一個SelectionKey代表這個通道注冊。
 * 而SelectionKey裡面有一個興趣集,表名目前channel注冊關注了哪些事件
 * 
 * <ul>
 *
 *   <li><p> The <i>key set</i> contains the keys representing the current
 *   channel registrations of this selector.  This set is returned by the
 *   {@link #keys() keys} method. </p></li>
 * 
 * 	 key set:鍵集,包含的SelectionKey表示此選擇器的目前通道注冊的鍵。
 *   這個集合可以由{@link #keys() keys}方法傳回。
 * 
 *   解釋:簡單來說就是隻要一個Channel在Selector上注冊了,在Selector裡就對應一個SelectionKey
 *   存放在key set裡,Selector注冊了幾個通道,這個集合就有幾個SelectionKey(跟channel注冊時
 *   關注的事件無關)
 * 
 *   <li><p> The <i>selected-key set</i> is the set of keys such that each
 *   key's channel was detected to be ready for at least one of the operations
 *   identified in the key's interest set during a prior selection operation.
 *   This set is returned by the {@link #selectedKeys() selectedKeys} method.
 *   The selected-key set is always a subset of the key set. </p></li>
 * 
 *   selected-key set:選擇鍵集,是這樣的集合,在之前一次選擇操作中,裡面每個key的channel
 *   都被檢測到在鍵的興趣集中辨別的操作裡至少有一個操作準備好。
 *   這個集合由{@link #selectedKeys() selectedKeys}方法傳回。
 *   選擇鍵集始終是鍵集的子集。
 * 
 *   解釋:channel在selector上注冊需要指定需要關注的事件,可以關注多個事件也可以
 *   一個事件都不關注,在一次selector.select的選擇期間,一個通道,隻要其注冊的通道感興趣的事件中有
 *   任何一個事件已經就緒,這個通道在selector上注冊代表的SelectionKey就會被放進selected-key
 * 
 *   <li><p> The <i>cancelled-key</i> set is the set of keys that have been
 *   cancelled but whose channels have not yet been deregistered.  This set is
 *   not directly accessible.  The cancelled-key set is always a subset of the
 *   key set. </p></li>
 *   cancelleed -key集合放的key是已取消但其通道尚未取消注冊的集合。
 *   這個集合不能直接通路。取消鍵集始終是鍵集的子集。
 * 
 *   解釋:在調用SelectionKey.cancle方法時候,就會将該Key放入cancelleed -key集合,
 *   此時雖然取消了,但是還沒有真正開始執行取消注冊的操作,并且該key在Key set集合裡依然存在,
 *   等下一次進行選擇的時候,會把cancelleed -key集合裡所有取消掉的key删除,并且Key set集合
 *   裡對應的key也會删除,此時才是真正的在Selector裡被删除
 * 
 * </ul>
 *
 * <p> All three sets are empty in a newly-created selector.
 * 在新建立的選擇器中,這三個集合都是空的。
 * 
 * <p> A key is added to a selector's key set as a side effect of registering a
 * channel via the channel's {@link SelectableChannel#register(Selector,int)
 * register} method.  Cancelled keys are removed from the key set during
 * selection operations.  The key set itself is not directly modifiable.
 * 通過通道的{@link SelectableChannel#register(selector,int) register}方法注冊通道時,
 * 其中的一個副作用就是将一個SelectionKey鍵添加到selector的key set。
 * 在選擇操作期間(調用selector.select方法時),将取消的鍵從取消鍵集中删除。
 * 這個取消鍵集本身不能直接修改。
 * 
 * <p> A key is added to its selector's cancelled-key set when it is cancelled,
 * whether by closing its channel or by invoking its {@link SelectionKey#cancel
 * cancel} method.  Cancelling a key will cause its channel to be deregistered
 * during the next selection operation, at which time the key will removed from
 * all of the selector's key sets.
 * 當一個鍵被取消時,它會被添加到selector的cancelled-key鍵集中,無論是通過關閉它的通道
 * 還是通過調用它的{@link SelectionKey#cancel cancel}方法。取消一個鍵将導緻它的通道
 * 在下一次選擇操作期間取消注冊,此時該鍵将從選擇器的所有鍵集中删除。
 *
 * <a name="sks"></a><p> Keys are added to the selected-key set by selection
 * operations.  A key may be removed directly from the selected-key set by
 * invoking the set's {@link java.util.Set#remove(java.lang.Object) remove}
 * method or by invoking the {@link java.util.Iterator#remove() remove} method
 * of an {@link java.util.Iterator iterator} obtained from the
 * set.  Keys are never removed from the selected-key set in any other way;
 * they are not, in particular, removed as a side effect of selection
 * operations.  Keys may not be added directly to the selected-key set. </p>
 * SelectionKey添加到選擇鍵集是通過選擇操作(即執行selector.select方法期間)的。
 * 可以通過調用集合的remove方法或調用集合疊代器的remove方法從選擇鍵集中直接删除一個鍵。
 * 鍵不會以任何其他方式從選擇鍵集合中移除;特别注意的是,它們并沒有作為選擇操作的副作用被移除。
 * SelectionKey不能直接添加到選擇鍵集。
 * 
 * 解釋:這段話就說明了兩點
 * 第一,選擇鍵集即selected-key set中的元素,隻能在selector.selcet方法執行的時候,即
 * 選擇操作期間添加,在選擇操作期間該key對應的channel關注的事件中有任何一個事件就緒都會将
 * 其添加到selected-key set
 * 第二,選擇鍵集中的元素不會通過任何方式被移除,隻能通過調用selected-key set的remove方法
 * 或者通過集合疊代器的remove方法移除key,也就是為什麼一般在處理完SelectionKey對應的事件
 * 後,必須手動主動将key移除選擇鍵集的原因
 * 
 * 
 * ---------------------    華麗的分割線    ---------------------
 * 
 * <a name="selop"></a>
 * <h2>Selection</h2>     選擇
 *
 * <p> During each selection operation, keys may be added to and removed from a
 * selector's selected-key set and may be removed from its key and
 * cancelled-key sets.  Selection is performed by the {@link #select()}, {@link
 * #select(long)}, and {@link #selectNow()} methods, and involves three steps:
 * 每次選擇操作期間,鍵可能會被添加到選擇器的選擇鍵集 或 從選擇器的選擇鍵集移除,也可能
 * 通過它的鍵和取消的鍵集删除。選擇由{@link #select()}、{@link #select(long)}和
 * {@link #selectNow()}方法執行,包括三個步驟:
 * 
 * </p>
 * 
 * 下面介紹的三步就是select方法的邏輯:
 * <ol>
 *
 *   <li><p> Each key in the cancelled-key set is removed from each key set of
 *   which it is a member, and its channel is deregistered.  This step leaves
 *   the cancelled-key set empty. </p></li>
 *   1:取消鍵集中的每個key都将從它所屬的每個鍵集中删除,其通道也将取消注冊。
 *   此步驟将取消鍵集變為為空集合。
 * 
 *   <li><p> The underlying operating system is queried for an update as to the
 *   readiness of each remaining channel to perform any of the operations
 *   identified by its key's interest set as of the moment that the selection
 *   operation began.  For a channel that is ready for at least one such
 *   operation, one of the following two actions is performed: </p>
 *   2:在選擇操作開始時,會查詢底層作業系統的更新,以了解每個剩餘通道是否準備好
 *   執行由其鍵的興趣集辨別的任何操作。對于為至少一個這樣的操作準備好的通道,
 *   執行以下兩個操作中的一個:
 *   <ol>
 *
 *     <li><p> If the channel's key is not already in the selected-key set then
 *     it is added to that set and its ready-operation set is modified to
 *     identify exactly those operations for which the channel is now reported
 *     to be ready.  Any readiness information previously recorded in the ready
 *     set is discarded.  </p></li>
 *     2.1:如果通道的鍵不在選擇鍵集中,則将其添加到該集中,并修改其ready-operation集,
 *     以準确地識别channel現在報告已準備就緒的操作。丢棄之前記錄在準備集中的任何準備資訊。
 *    
 *     解釋:從這個步驟可以看出,如果SelectionKey是新加入選擇鍵集的話,其SelectionKey
 *     中的準備集中的資訊是不會保留上一次的。
 * 
 *     <li><p> Otherwise the channel's key is already in the selected-key set,
 *     so its ready-operation set is modified to identify any new operations
 *     for which the channel is reported to be ready.  Any readiness
 *     information previously recorded in the ready set is preserved; in other
 *     words, the ready set returned by the underlying system is
 *     bitwise-disjoined into the key's current ready set. </p></li>
 *     2.2:否則,通道的鍵已經在選擇鍵集中,是以它的準備集将被修改,以辨別通道被報告為已就緒的
 *     任何新操作。保留之前記錄在準備集中的任何準備資訊;換句話說,底層系統傳回的準備集被
 *     按位分解為鍵的目前準備集。
 *     
 *     解釋:這個步驟描述的情況是channel的鍵已經在選舉鍵的情況,什麼時候會發生這種情況?
 *     首先要了解,selector.select選擇的時候,對于我們調用者來說,感覺隻是執行了一個同步方法,
 *     調用了一次,但底層其實是多次輪詢監聽所有的channel,輪詢的過程中調用線程會被阻塞,是以
 *     我們感覺是同步的,底層在到達某一個條件會傳回結果,然後喚醒調用線程。
 *     是以同個channel可能在多次輪詢中都會發現其某個感興趣的事件已經就緒,而隻有第一次發現
 *     的時候其SelectionKey是不在選擇鍵集中的,這個時候會添加進去,之後輪詢到同一個channel
 *     會發現選擇鍵集中已經存在,這個時候上一次輪詢标記的就緒事件肯定是不能丢棄的,因為還沒處理。
 *     
 *     還有一種情況,就是我們已經處理了SelectionKey的業務邏輯,但是沒有将其從選擇鍵集中移除,
 *     是以下一次選擇操作,這個SelectionKey還是會存在在選擇鍵集中,并被我們重複處理,同時這個
 *     SelectionKey的準備集中的準備資訊,上一次記錄的資訊還是被保留的。
 *     
 *     由此可以推斷出,底層系統傳回的通道準備資訊,應該是按照二進制位,1代表就緒,0代表未就緒,
 *     每一位代表一個事件,傳回這樣的二進制數字(雖然readyOps方法傳回的是int,進制之間可以随意轉換),
 *     而NIO處理的時候,如果SelectionKey不在選擇鍵集中,則直接替換原有的準備集的數字,如果已經存在,
 *     則取原來的準備集數字和現在系統傳回的數字進行與預算
 *   </ol>
 *
 *   If all of the keys in the key set at the start of this step have empty
 *   interest sets then neither the selected-key set nor any of the keys'
 *   ready-operation sets will be updated.
 *   2:如果此步驟開始時的鍵集中的所有鍵的興趣集都是空的(0也是空,代表不關注任何事件),
 *   那麼選擇鍵集 和 鍵的任何準備集 都不會被更新。
 *
 *   <li><p> If any keys were added to the cancelled-key set while step (2) was
 *   in progress then they are processed as in step (1). </p></li>
 *   3:如果在步驟(2)進行時向取消的鍵集添加了任何鍵,則按照步驟(1)處理它們。
 * </ol>
 *
 * <p> Whether or not a selection operation blocks to wait for one or more
 * channels to become ready, and if so for how long, is the only essential
 * difference between the three selection methods. </p>
 * 選擇操作是否阻塞以等待一個或多個通道就緒,如果是,等待多長時間,是三種選擇方法之間唯一的本質差別。
 * 
 * 解釋:Selector中有三個選擇操作的方法,select()/select(long timeout)/selectNow()
 * 這一段描述其實就是這三個方法差別
 * 
 * 
 * ---------------------    華麗的分割線    ---------------------
 * 
 * 
 * <h2>Concurrency</h2>    并發性
 *
 * <p> Selectors are themselves safe for use by multiple concurrent threads;
 * their key sets, however, are not.
 * 選擇器本身對于多個并發線程來說是安全的;然而,它們的鍵集卻不是。
 * 
 * 解釋:簡單來首就是我們直接調用Selector各個api是線程安全的,但是如果我們直接取Selector裡
 * 的鍵集自己修改是線程不安全的
 * 
 * <p> The selection operations synchronize on the selector itself, on the key
 * set, and on the selected-key set, in that order.  They also synchronize on
 * the cancelled-key set during steps (1) and (3) above.
 * 選擇操作按順序同步選擇器本身、鍵集和選擇鍵集。它們還在上面的步驟(1)和(3)中同步已取消的鍵集。
 * 
 * <p> Changes made to the interest sets of a selector's keys while a
 * selection operation is in progress have no effect upon that operation; they
 * will be seen by the next selection operation.
 * 在進行選擇操作時,對選擇器鍵的興趣集所做的更改不會影響該操作;它們将被下一個選擇操作看到。
 * 
 * <p> Keys may be cancelled and channels may be closed at any time.  Hence the
 * presence of a key in one or more of a selector's key sets does not imply
 * that the key is valid or that its channel is open.  Application code should
 * be careful to synchronize and check these conditions as necessary if there
 * is any possibility that another thread will cancel a key or close a channel.
 * SelectionKey可能在任何時候被取消,通道也可能在任何時候被關閉。是以,在一個或多個選擇器
 * 的鍵集中出現一個鍵并不意味着該鍵是有效的,或者它的通道是打開的。應用程式代碼應該小心同步,
 * 并在必要時檢查這些條件,看是否有其他線程取消鍵或關閉通道的可能性。
 * 
 * <p> A thread blocked in one of the {@link #select()} or {@link
 * #select(long)} methods may be interrupted by some other thread in one of
 * three ways:
 * 在Selector.select()和Selector.select(long)方法中阻塞的線程可能會被其他線程以
 * 以下三種方式中斷:
 *
 * <ul>
 *
 *   <li><p> By invoking the selector's {@link #wakeup wakeup} method,
 *   </p></li>
 *   通過調用selector的wakeup方法
 *
 *   <li><p> By invoking the selector's {@link #close close} method, or
 *   </p></li>
 *   通過調用selector的{close方法,或者
 * 
 *   <li><p> By invoking the blocked thread's {@link
 *   java.lang.Thread#interrupt() interrupt} method, in which case its
 *   interrupt status will be set and the selector's {@link #wakeup wakeup}
 *   method will be invoked. </p></li>
 *   通過調用被阻塞線程的link java.lang.Thread#interrupt()方法,該方法會将此線程設定為
 *   中斷狀态(設定中斷狀态并不會中斷線程),線程執行過程發現自己是中斷狀态會執行
 *   selector的wakeup方法
 * </ul>
 *
 * <p> The {@link #close close} method synchronizes on the selector and all
 * three key sets in the same order as in a selection operation.
 * close方法以與選擇操作相同的順序同步選擇器和所有三個鍵集。
 * 
 * <a name="ksc"></a>
 *
 * <p> A selector's key and selected-key sets are not, in general, safe for use
 * by multiple concurrent threads.  If such a thread might modify one of these
 * sets directly then access should be controlled by synchronizing on the set
 * itself.  The iterators returned by these sets' {@link
 * java.util.Set#iterator() iterator} methods are <i>fail-fast:</i> If the set
 * is modified after the iterator is created, in any way except by invoking the
 * iterator's own {@link java.util.Iterator#remove() remove} method, then a
 * {@link java.util.ConcurrentModificationException} will be thrown. </p>
 * 通常,對于多個并發線程來說,選擇器的鍵集和選擇鍵集并不安全。
 * 如果有一個這樣的線程可能直接修改這些集合中的一個,那麼應該通過同步該集合本身來控制通路。
 * 這些集合的{@link java.util.Set#iterator()}方法傳回的疊代器是快速失敗的:
 * 如果在建立疊代器之後修改了集合,除了調用疊代器自己的remove方法外,使用任何方式進行修改,
 * 都會抛出并發修改異常(第一個例子中會示範這個情況!)
 * 
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 *
 * @see SelectableChannel
 * @see SelectionKey
 */

public abstract class Selector implements Closeable {
	...
}
           

我們稍微關注一下Selector 的三個選擇操作方法:

  • public abstract int select() throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 選擇一組鍵,它們對應的通道已經為I/O操作準備好了。
     * 
     * <p> This method performs a blocking <a href="#selop" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" >selection
     * operation</a>.  It returns only after at least one channel is selected,
     * this selector's {@link #wakeup wakeup} method is invoked, or the current
     * thread is interrupted, whichever comes first.  </p>
     * 這個方法執行一個阻塞選擇操作。!!!
     * 它隻在至少一個通道被選中、此選擇器的{@link #wakeup wakeup}方法被調用或目前線
     * 程被中斷(以最先出現的方式)後傳回。
     * 
     * @return  The number of keys, possibly zero,
     *          whose ready-operation sets were updated
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     */
    public abstract int select() throws IOException;
               
  • public abstract int select(long timeout) throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 選擇一組鍵,它們對應的通道已經為I/O操作準備好了。
     * 
     * <p> This method performs a blocking <a href="#selop" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" >selection
     * operation</a>.  It returns only after at least one channel is selected,
     * this selector's {@link #wakeup wakeup} method is invoked, the current
     * thread is interrupted, or the given timeout period expires, whichever
     * comes first.
     * 這個方法執行一個阻塞選擇操作。它隻在至少一個通道被選中、此選擇器的
     * {@link #wakeup wakeup}方法被調用、目前線程被中斷或給定的逾時時
     * 間過期(以最先出現的方式)之後傳回。
     * 
     * <p> This method does not offer real-time guarantees: It schedules the
     * timeout as if by invoking the {@link Object#wait(long)} method. </p>
     * 這個方法不提供實時保證:它通過調用{@link Object#wait(long)}方法來排程逾時。
     * 
     * @param  timeout  If positive, block for up to <tt>timeout</tt>
     *                  milliseconds, more or less, while waiting for a
     *                  channel to become ready; if zero, block indefinitely;
     *                  must not be negative
     *
     * @return  The number of keys, possibly zero,
     *          whose ready-operation sets were updated
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     *
     * @throws  IllegalArgumentException
     *          If the value of the timeout argument is negative
     */
    public abstract int select(long timeout) throws IOException;
               
  • public abstract int selectNow() throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 選擇一組鍵,它們對應的通道已經為I/O操作準備好了。
     * 
     * <p> This method performs a non-blocking <a href="#selop" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" >selection
     * operation</a>.  If no channels have become selectable since the previous
     * selection operation then this method immediately returns zero.
     * 該方法執行一個非阻塞選擇操作。如果自上一個選擇操作以來沒有通道成為可選擇的,
     * 那麼此方法将立即傳回零。
     * 
     * <p> Invoking this method clears the effect of any previous invocations
     * of the {@link #wakeup wakeup} method.  </p>
     * 調用此方法将清除以前調用{@link #wakeup wakeup}方法的效果。
     * 
     * @return  The number of keys, possibly zero, whose ready-operation sets
     *          were updated by the selection operation
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     */
    public abstract int selectNow() throws IOException;
               

總結:三個方法差別就是阻塞還是不阻塞,阻塞是一直阻塞,還是阻塞指定的最長時間

IO多路複用的機制差別,簡單說明,後期會專門開一篇介紹:
  • select/poll:在使用者态維護的連接配接連結清單,每次select需要把所有連接配接複制到核心态,并輪詢所有的連接配接,在将結果傳回到使用者态,并再次周遊所有連接配接,找到就緒的連接配接
    • select用的是數組維護連接配接句柄,poll用的是連結清單結構維護連接配接句柄,是以select有大小限制
  • epoll(linux支援):連接配接連結清單維護在核心态,隻收集發生事件的連接配接,将發生事件的連接配接傳回到使用者态去處理

Netty用的是poll方式

SelectionKey抽象類簡介

看下類介紹,有些概念已經在Selector中介紹過了:

/**
 * A token representing the registration of a {@link SelectableChannel} with a
 * {@link Selector}.
 * 一個表示{@link SelectableChannel}注冊到{@link Selector}的标記(令牌)。
 * 
 * <p> A selection key is created each time a channel is registered with a
 * selector.  A key remains valid until it is <i>cancelled</i> by invoking its
 * {@link #cancel cancel} method, by closing its channel, or by closing its
 * selector.  Cancelling a key does not immediately remove it from its
 * selector; it is instead added to the selector's <a
 * href="Selector.html#ks" target="_blank" rel="external nofollow" ><i>cancelled-key set</i></a> for removal during the
 * next selection operation.  The validity of a key may be tested by invoking
 * its {@link #isValid isValid} method.
 * 每次向selector注冊通道時,都會建立選擇鍵。一個鍵保持有效,直到它被取消了,取消可以通過調用它的
 * {@link #cancel cancel}方法,或者關閉它的通道,或者關閉它的選擇器。
 * 取消一個鍵不會立即将其從選擇器中移除;取代的是它被添加到選擇器的 cancelleled -key set
 * 用于在下一次選擇操作中删除。
 * 一個key的有效性可以通過調用它的{@link #isValid isValid}方法來測試。
 * 
 * <a name="opsets"></a>
 *
 * <p> A selection key contains two <i>operation sets</i> represented as
 * integer values.  Each bit of an operation set denotes a category of
 * selectable operations that are supported by the key's channel.
 * 一個選擇鍵包含兩個用整數值表示的操作集。操作集的每個二進制位表示鍵的通道支援的可選擇操作的類别。
 * 
 * <ul>
 *
 *   <li><p> The <i>interest set</i> determines which operation categories will
 *   be tested for readiness the next time one of the selector's selection
 *   methods is invoked.  The interest set is initialized with the value given
 *   when the key is created; it may later be changed via the {@link
 *   #interestOps(int)} method. </p></li>
 *   興趣集決定了在下一次調用選擇器的選擇方法時将測試哪些操作類别是否準備就緒。
 *   在建立選擇鍵的時候,用給定的值初始化興趣集;之後可以通過{@link #interestOps(int)}方法對其
 *   進行更改。
 * 
 *   解釋:channel注冊到Selector的時候會用channel.register(Selector sel, int ops)方法,
 *   ops參數就是興趣集,是一個整數,轉換成二進制後,每個二進制位代表一種操作類型,1代表關注
 *   注冊完以後,之後可以通過調用該channel對應的SelectionKey的interestOps進行查詢或修改
 *   
 *   <li><p> The <i>ready set</i> identifies the operation categories for which
 *   the key's channel has been detected to be ready by the key's selector.
 *   The ready set is initialized to zero when the key is created; it may later
 *   be updated by the selector during a selection operation, but it cannot be
 *   updated directly. </p></li>
 *   準備集辨別了該鍵的通道已被鍵的selector檢測為就緒的所有操作類别。當選擇鍵被建立時,
 *   準備集被初始化為零;它可能會在之後的選擇操作期間由選擇器更新,但不能直接更新。
 * </ul>
 *
 * <p> That a selection key's ready set indicates that its channel is ready for
 * some operation category is a hint, but not a guarantee, that an operation in
 * such a category may be performed by a thread without causing the thread to
 * block.  A ready set is most likely to be accurate immediately after the
 * completion of a selection operation.  It is likely to be made inaccurate by
 * external events and by I/O operations that are invoked upon the
 * corresponding channel.
 * 選擇鍵的準備集表明它的通道為某個操作類别準備好了,這是一種提示,但不是保證,
 * 說明此類類别中的操作可以由線程執行,而不會導緻線程阻塞。準備集最有可能在選擇
 * 操作立即完成後是準确的。外部事件和在相應通道上調用的I/O操作可能會使其不準确。
 * 
 * <p> This class defines all known operation-set bits, but precisely which
 * bits are supported by a given channel depends upon the type of the channel.
 * Each subclass of {@link SelectableChannel} defines an {@link
 * SelectableChannel#validOps() validOps()} method which returns a set
 * identifying just those operations that are supported by the channel.  An
 * attempt to set or test an operation-set bit that is not supported by a key's
 * channel will result in an appropriate run-time exception.
 * 這個類定義了所有已知的操作集位,但是給定通道所支援的位的精确程度取決于通道的類型。
 * {@link SelectableChannel}的每個子類都定義了一個
 * {@link SelectableChannel#validOps() validOps()}方法,該方法傳回一個集合,
 * 辨別通道支援的那些操作。嘗試設定或測試SelectionKey的通道不支援的操作集位将導緻适當的運作時異常。
 * 
 * <p> It is often necessary to associate some application-specific data with a
 * selection key, for example an object that represents the state of a
 * higher-level protocol and handles readiness notifications in order to
 * implement that protocol.  Selection keys therefore support the
 * <i>attachment</i> of a single arbitrary object to a key.  An object can be
 * attached via the {@link #attach attach} method and then later retrieved via
 * the {@link #attachment() attachment} method.
 * 通常需要将一些特定于應用程式的資料與選擇鍵相關聯,例如表示進階協定狀态并處理準備就緒通知以便
 * 實作該協定的對象。是以選擇鍵支援一個任意對象對一個鍵的附件。對象可以通過
 * {@link #attach attach}方法附加,然後通過{@link #attachment() attachment}方法檢索。
 * 
 * <p> Selection keys are safe for use by multiple concurrent threads.  The
 * operations of reading and writing the interest set will, in general, be
 * synchronized with certain operations of the selector.  Exactly how this
 * synchronization is performed is implementation-dependent: In a naive
 * implementation, reading or writing the interest set may block indefinitely
 * if a selection operation is already in progress; in a high-performance
 * implementation, reading or writing the interest set may block briefly, if at
 * all.  In any case, a selection operation will always use the interest-set
 * value that was current at the moment that the operation began.  </p>
 * 選擇鍵對于多個并發線程來說是安全的。對興趣集的讀寫操作通常會與選擇器的某些操作同步。
 * 确切地說,同步是如何執行的取決于實作:在一個幼稚的實作中,如果選擇操作已經在進行,
 * 那麼對興趣集的讀寫操作可能會無限期地阻塞;在高性能實作中,對興趣集的讀寫操作可能會短暫阻塞。
 * 在任何情況下,選擇操作都将始終使用操作開始時的目前興趣集值。
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 *
 * @see SelectableChannel
 * @see Selector
 */

public abstract class SelectionKey {
	...
}
           

類描述中提到一個SelectionKey 包含兩個用整數值表示的操作集,一個是興趣集(表示需要關注的事件),一個是準備集(表示已經準備就緒的事件),我們看一下興趣集中都有哪些類型的操作,SelectionKey中定義了四種類别:

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

1 << 0 :1 讀操作

1<< 2 :4 寫操作

1<< 3 :8 連接配接操作(指用戶端主動連接配接)

1<< 4 :16 接受操作(指用戶端被動接受用戶端的連接配接)>

相加可以組合多個操作

1+4 = 5,即5代表讀和寫操作

1+4+8

channel注冊到Selector的時候需要指定該通道感興趣的事件,通常調用的是這個API:

java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int)

在簡單看一下SelectionKey的API:

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

2. NIO 通信 14-niosocket

這裡使用 NIO 實作一個簡單的 C/S 通信:Client 向 Server 發送一個資料,顯示在 Server端控制台。

(1) 建立工程

建立一個普通的 Maven 的 java 工程即可:14-niosocket

(2) 修改 pom

無需添加任何依賴。但需要指定編譯器版本。

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
           

(3) 定義用戶端

public class NioClient {
    public static void main(String[] args) throws Exception {
        // 建立用戶端channel
        SocketChannel clientChannel = SocketChannel.open();
        // 指定channel使用非阻塞模式
        clientChannel.configureBlocking(false);
        // 指定要連接配接的Server位址
        InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
        // 連接配接Server
        if (!clientChannel.connect(serverAddr)) {   // 首次連接配接
            while (!clientChannel.finishConnect()) {   // 完成重連
                System.out.println("連接配接不上server,正在嘗試連接配接中。。。");
                continue;
            }
        }
        // 将消息寫入到channel
        clientChannel.write(ByteBuffer.wrap("hello".getBytes()));
        System.out.println("Client消息已發送");

        System.in.read();
    }
}
           

注意:

SocketChannel的connect()和finishConnect()方法差別

  • connect()
    /**
         * Connects this channel's socket.
         * 連接配接此通道的套接字。
         * 
         * <p> If this channel is in non-blocking mode then an invocation of this
         * method initiates a non-blocking connection operation.  If the connection
         * is established immediately, as can happen with a local connection, then
         * this method returns <tt>true</tt>.  Otherwise this method returns
         * <tt>false</tt> and the connection operation must later be completed by
         * invoking the {@link #finishConnect finishConnect} method.
         * 如果此通道處于非阻塞模式,則調用此方法将啟動一個非阻塞連接配接操作。
         * 如果立即建立了連接配接,就像本地連接配接一樣,那麼該方法傳回true。
         * 否則,此方法将傳回false,随後必須通過調用
         * {@link #finishConnect finishConnect}方法來完成連接配接操作。
         * 
         * <p> If this channel is in blocking mode then an invocation of this
         * method will block until the connection is established or an I/O error
         * occurs.
         * 如果此通道處于阻塞模式,則此方法的調用将阻塞,直到連接配接建立或發生I/O錯誤為止。
         *
         * <p> This method performs exactly the same security checks as the {@link
         * java.net.Socket} class.  That is, if a security manager has been
         * installed then this method verifies that its {@link
         * java.lang.SecurityManager#checkConnect checkConnect} method permits
         * connecting to the address and port number of the given remote endpoint.
         * 這個方法執行和{@link java.net.Socket}類完全相同的安全檢查。也就是說,如果安裝了
         * 安全管理器,則此方法将驗證其
         * {@link java.lang.SecurityManager#checkConnect checkConnect}方法是
         * 否允許連接配接到給定遠端端點的位址和端口号。
         * 
         * <p> This method may be invoked at any time.  If a read or write
         * operation upon this channel is invoked while an invocation of this
         * method is in progress then that operation will first block until this
         * invocation is complete.  If a connection attempt is initiated but fails,
         * that is, if an invocation of this method throws a checked exception,
         * then the channel will be closed.  </p>
         * 此方法可以在任何時候調用。如果在調用此方法時調用了該通道上的讀或寫操作,那麼該操作将首先
         * 阻塞,直到此方法調用完成。如果一個連接配接請求發起但是失敗了,也就是說,如果調用此方法抛出
         * 一個檢查異常,則通道将被關閉。
         * ...
         */
        public abstract boolean connect(SocketAddress remote) throws IOException;
               
  • finishConnect()
    /**
     * Finishes the process of connecting a socket channel.
     * 完成連接配接套接字通道的過程。
     * 
     * <p> A non-blocking connection operation is initiated by placing a socket
     * channel in non-blocking mode and then invoking its {@link #connect
     * connect} method.  Once the connection is established, or the attempt has
     * failed, the socket channel will become connectable and this method may
     * be invoked to complete the connection sequence.  If the connection
     * operation failed then invoking this method will cause an appropriate
     * {@link java.io.IOException} to be thrown.
     * 非阻塞連接配接操作是通過将套接字通道置于非阻塞模式,然後調用其{@link #connect connect}方
     * 法來啟動的。一旦建立了連接配接,或者嘗試失敗,都将會讓套接字通道将變得可連接配接,并且可以
     * 調用此方法繼續完成連接配接。如果連接配接操作失敗,那麼調用此方法将導緻一個合适的
     * {@link java.io.IOException}抛出。
     * 
     * <p> If this channel is already connected then this method will not block
     * and will immediately return <tt>true</tt>.  If this channel is in
     * non-blocking mode then this method will return <tt>false</tt> if the
     * connection process is not yet complete.  If this channel is in blocking
     * mode then this method will block until the connection either completes
     * or fails, and will always either return <tt>true</tt> or throw a checked
     * exception describing the failure.
     * 如果這個通道已經連接配接成功,那麼這個方法就不會阻塞,并且會立即傳回true。如果此通道處
     * 于非阻塞模式,則如果連接配接過程尚未完成,此方法将傳回false。如果此通道處于阻塞模式,
     * 則此方法将阻塞,直到連接配接完成或失敗為止,并且總是傳回true或抛出一個檢查異常描述失敗。
     * 
     * <p> This method may be invoked at any time.  If a read or write
     * operation upon this channel is invoked while an invocation of this
     * method is in progress then that operation will first block until this
     * invocation is complete.  If a connection attempt fails, that is, if an
     * invocation of this method throws a checked exception, then the channel
     * will be closed.  </p>
     * 此方法可以在任何時候調用。如果在調用此方法的過程中調用了該通道上的讀或寫操作,那麼讀和寫
     * 這些該操作将首先阻塞,直到本方法調用完成。如果連接配接嘗試失敗,也就是說,如果調用此方法
     * 引發檢查異常,則通道将被關閉。
     * ...
     */
    public abstract boolean finishConnect() throws IOException;
               

(4) 定義服務端

public class NioServer {
    public static void main(String[] args) throws Exception {
        // 建立一個服務端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的為非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要監聽的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 建立一個多路複用器selector
        Selector selector = Selector.open();
        // 将channel注冊到selector,并告訴selector讓其監聽“接收Client連接配接事件”
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // select()是一個阻塞方法,若阻塞1秒的時間到了,或在阻塞期間有channel就緒,都會打破阻塞
            if (selector.select(1000) == 0) {
                System.out.println("目前沒有找到就緒的channel");
                continue;
            }

            // 代碼能走到這裡,說明已經有channel就緒
            // 擷取所有就緒的channel的SelectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 周遊所有就緒的key
            for (SelectionKey key : selectionKeys) {
                // 若目前key為OP_ACCEPT,則說明目前channel是可以接收用戶端連接配接的。
                // 那麼,這裡的代碼就是用于接收用戶端連接配接的
                if (key.isAcceptable()) {
                    System.out.println("接收到Client的連接配接");
                    // 擷取連接配接到Server的用戶端channel,其是用戶端channel在server端的代表(駐京辦)
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    // 将用戶端channel注冊到selector,并告訴selector讓其監聽這個channel中是否發生了讀事件
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
                // 若目前key為OP_READ,則說明目前channel中有用戶端發送來的資料。
                // 那麼,這裡的代碼就是用于讀取channel中的資料的
                if (key.isReadable()) {
                    try {
                        // 建立buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        // 根據key擷取其對應的channel
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        // 把channel中的資料讀取到buffer
                        clientChannel.read(buffer);
                    } catch (Exception e) {
                        // 若在讀取過程中發生異常,則直接取消該key,即放棄該channel
                        key.cancel();
                    }
                }

                // 删除目前處理過的key,以免重複處理
                selectionKeys.remove(key);
            } // end-for
        }

    }
}
           

注意這段實際上執行的時候會有問題,selectionKeys在有多個的情況下,使用for循環,直接用集合的remove(key)方法會導緻并發修改異常:

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

改成疊代器的周遊方式即可:

public class NioServer2 {
    public static void main(String[] args) throws Exception {
        // 建立一個服務端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的為非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要監聽的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 建立一個多路複用器selector
        Selector selector = Selector.open();
        // 将channel注冊到selector,并告訴selector讓其監聽“接收Client連接配接事件”
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // select()是一個阻塞方法,若阻塞1秒的時間到了,或在阻塞期間有channel就緒,都會打破阻塞
            if (selector.select(1000) == 0) {
                System.out.println("目前沒有找到就緒的channel");
                continue;
            }

            // 代碼能走到這裡,說明已經有channel就緒
            // 擷取所有就緒的channel的key
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            // 周遊所有就緒的key
            while (it.hasNext()) {
                SelectionKey key = it.next();
                // 若目前key為OP_ACCEPT,則說明目前channel是可以接收用戶端連接配接的。
                // 那麼,這裡的代碼就是用于接收用戶端連接配接的
                if (key.isAcceptable()) {
                    System.out.println("接收到Client的連接配接");
                    // 擷取連接配接到Server的用戶端channel,其是用戶端channel在server端的代表(駐京辦)
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    // 将用戶端channel注冊到selector,并告訴selector讓其監聽這個channel中是否發生了讀事件
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
                // 若目前key為OP_READ,則說明目前channel中有用戶端發送來的資料。
                // 那麼,這裡的代碼就是用于讀取channel中的資料的
                if (key.isReadable()) {
                    try {
                        // 建立buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        // 根據key擷取其對應的channel
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        // 把channel中的資料讀取到buffer
                        clientChannel.read(buffer);
                    } catch (Exception e) {
                        // 若在讀取過程中發生異常,則直接取消該key,即放棄該channel
                        key.cancel();
                    }
                }

                // 删除目前正在疊代的的key,以免重複處理
                it.remove();
            }
        }

    }
}
           

如果隻開啟用戶端的效果:

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

3. NIO 群聊 15-niochat

該工程實作的功能是:隻要有 Client 啟動、發送消息,及下線,都會廣播給所有其它Client 通知。

(1) 建立工程

建立一個普通的 Maven 的 java 工程即可:15-niochat

(2) 修改 pom

無需添加任何依賴。但需要指定編譯器版本。

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
           

(3) 定義 NioChatServerStarter

public class NioChatServerStarter {
    public static void main(String[] args) throws Exception {
        // 建立一個服務端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的為非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要監聽的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 建立一個多路複用器selector
        Selector selector = Selector.open();
        // 将channel注冊到selector
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 建立支援群聊的nio server
        NioChatServer chatServer = new NioChatServer();
        chatServer.enableChat(serverChannel, selector);
    }
}
           

(4) 定義 NioChatServer

public class NioChatServer {
    // 開啟Server的群聊支援功能
    public void enableChat(ServerSocketChannel serverChannel, Selector selector) throws Exception {
        System.out.println("chatServer啟動。。。");
        while (true) {
            if (selector.select(1000) == 0) {
                continue;
            }

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            for (SelectionKey key : selectionKeys) {
                // 處理用戶端上線
                // 處理連接配接情況,隻要有Client連接配接到伺服器,就廣播給所有Client通知:
                if (key.isAcceptable()) {
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    // 擷取到client位址
                    String msg = clientChannel.getRemoteAddress() + "-上線了";
                    // 将上線通知廣播給所有線上的其它client
                    sendMSGToOtherClientOnline(selector, clientChannel, msg);
                }

                // 處理用戶端發送消息情況:
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel)key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    clientChannel.read(buffer);
                    // 擷取來自于client的消息,trim()将buffer中沒有資料的内容轉為的空格去掉
                    String msgFromClient = new String(buffer.array()).trim();
                    if (msgFromClient.length() > 0) {
                        // 擷取到client位址
                        SocketAddress clientAddr = clientChannel.getRemoteAddress();
                        // 構成要發送給其它client的消息
                        String msgToSend = clientAddr + " say:" + msgFromClient;
                        // 處理用戶端下線的情況:
                        // 下線,如果用戶端直接關閉的話對于服務端來說,對應的通道
                        // 會變成可讀的,但是嘗試讀取這個消息内容是會報錯的
                        // 是以我們主動讓用戶端發送一個”88”代表要下線了
                        // 若client發送的是字元串"88",則表示其要下線
                        if ("88".equals(msgFromClient)) {
                            msgToSend = clientAddr + "下線";
                            // 取消目前key,即放棄其所對應的channel,
                            // 将其對應的channel從selector中去掉
                            key.cancel();
                        }
                        // 将client消息廣播給所有線上的其它client
                        sendMSGToOtherClientOnline(selector, clientChannel, msgToSend);
                    }
                }  // end-if
                selectionKeys.remove(key);
            } // end-for
        }
    }
	//處理發送消息邏輯:
	//将client消息廣播給所有線上的其它client
    private void sendMSGToOtherClientOnline(Selector selector, SocketChannel self, String msg) throws IOException {
        // 周遊所有注冊到selector的channel,即所有線上的client
        for (SelectionKey key : selector.keys()) {
            SelectableChannel channel = key.channel();
            // 将消息發送給所有其它client
            // 需要判斷channel類型,因為我們服務端的channel和用戶端的channel注冊
            // 的都是一個selector
            if (channel instanceof SocketChannel && channel != self) {
                ((SocketChannel) channel).write(ByteBuffer.wrap(msg.trim().getBytes()));
            }
        }
    }
}
           

Selector注冊需要關注的事件後,Selector就會監控這些事件是否發生,如果事件來了表示已經就緒了,就可以選擇發生該事件的通道讓線程去處理,而服務端在處理SelectionKey的時候通常不需要判斷key.isWritable()和key.isConnectable()這兩種就緒事件:

  • 連接配接:如果有用戶端連接配接我,我這就接受該連接配接,一接受連接配接就發送了接受事件,通道就會變為就緒,Selector就能選上
  • 可讀:隻要有用戶端給我寫資料,我就開始讀資料,隻要我一讀,就發送了讀事件,通道就會變為就緒,Selector就能選上

上面兩個情況都是被動的,是以需要Selector幫我們監聽,而write和connect是自己幹的事,主動去做的,不需要監聽,自己想寫就寫,想連就連

上面代碼在處理關閉邏輯的時候,我是判斷client發送的是字元串"88",則表示其要下線,實際上直接關閉用戶端也是可以的,但是要注意:

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

(5) 定義 NioChatClientStarter

public class NioChatClientStarter {
    public static void main(String[] args) throws Exception {
        // 建立用戶端channel
        SocketChannel clientChannel = SocketChannel.open();
        // 指定channel使用非阻塞模式
        clientChannel.configureBlocking(false);
        // 指定要連接配接的Server位址
        InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
        // 連接配接Server
        if (!clientChannel.connect(serverAddr)) {   // 首次連接配接
            while (!clientChannel.finishConnect()) {   // 完成重連
                System.out.println("連接配接不上server,正在嘗試連接配接中。。。");
                continue;
            }
        }
        // 建立群聊用戶端,啟動聊天功能
        NioChatClient chatClient = new NioChatClient();
        chatClient.enableChat(clientChannel);
    }
}
           

(6) 定義 NioChatClient

啟動聊天功能,需要接受來自Server的消息,并可以像Server發送消息

我們需要啟動一個線程監聽來自Server的消息

public class NioChatClient {
    // 啟動聊天功能
    public void enableChat(SocketChannel clientChannel) throws Exception {
        // 擷取client自己的位址
        SocketAddress selfAddr = clientChannel.getLocalAddress();
        System.out.println(selfAddr + ",你已經成功上線了");

        // 建立一個線程用于不間斷地接收來自于Server的消息
        new Thread() {
            @Override
            public void run() {
                // 實作不間斷
                while (true) {
                    // 接收來自于server的消息
                    try {
                        // 若目前client已經關閉,則結束循環,
                        // 否則正常接收來自Server的消息
                        if (!clientChannel.isConnected()) {
                            return;
                        }
                        receiveMsgFromServer(clientChannel);
                        TimeUnit.SECONDS.sleep(1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        // 注意,該方法不能寫到前面的建立線程之前,這樣會導緻無法接收到來自于Server的消息,
        // 因為該方法中的Scanner是阻塞的
        // 向server發送消息
        sendMsgToServer(clientChannel);
    }

    // 向server發送消息
    private void sendMsgToServer(SocketChannel clientChannel) throws Exception {
        // 接收來自于鍵盤的輸入
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            // 将消息寫入到channel,其中有可能是下線請求消息88
            clientChannel.write(ByteBuffer.wrap(msg.trim().getBytes()));
            // 若消息為88,則表示目前client要下線,則将該channel關閉
            if ("88".equals(msg.trim())) {
                // 關閉用戶端
                clientChannel.close();
                return;
            }
        }
    }
	//從服務端接受消息:
    private void receiveMsgFromServer(SocketChannel clientChannel) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        clientChannel.read(buffer);
        String msg = new String(buffer.array()).trim();
        if (msg.length() > 0) {
            System.out.println(msg);
        }
    }
}
           

示範

Netty(六)源碼解析 之簡述NIO、用NIO實作Socket通信 和 群聊1. NIO 簡介2. NIO 通信 14-niosocket3. NIO 群聊 15-niochat

繼續閱讀