天天看點

selector socketChannel Selector 極簡即時通訊 結束

篇文章對NIO進行了簡介,對Channel和Buffer接口的使用進行了說明,并舉了一個簡單的例子來說明其使用方法。

本篇則重點說明selector,Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準備的元件。這樣,一個單獨的線程可以管理多個channel,進而管理多個網絡連接配接。

與selector聯系緊密的是ServerSocketChannel和SocketChannel,他們的使用與上篇文章描述的FileChannel的使用方法類似,然後與ServerSocket和Socket也有一些聯系。

本篇首先簡單的進selector進行說明,然後一個簡單的示例程式,來示範即時通訊。

Selector

使用傳統IO進行網絡程式設計,如下圖所示:

selector socketChannel Selector 極簡即時通訊 結束

每一個到服務端的連接配接,都需要一個單獨的線程(或者線程池)來處理其對應的socket,當連接配接數多的時候,對服務端的壓力極大。并使用socket的getInputStream。Read方法來不斷的輪訓每個socket,效率可想而知。

而selector則可以在同一個線程中監聽多個channel的狀态,當某個channel有selector感興趣的事情發現,selector則被激活。即不會主動去輪詢。如下圖所示:

selector socketChannel Selector 極簡即時通訊 結束

Selector使用如下示意:

[java]  view plain  copy

  1. public static void main(String[] args) throws IOException {  
  2.       Selector selector = Selector.open();//聲明selector  
  3.       ServerSocketChannel sc = ServerSocketChannel.open();  
  4.       sc.configureBlocking(false);//必須設定為異步  
  5.       sc.socket().bind(new InetSocketAddress(8081));//綁定端口  
  6.       //把channel 注冊到 selector上  
  7.       sc.register(selector, SelectionKey.OP_ACCEPT|SelectionKey.OP_CONNECT|SelectionKey.OP_READ|SelectionKey.OP_WRITE);  
  8.       while(true){  
  9.          selector.select();//阻塞,直到注冊的channel上某個感興趣的事情發生  
  10.          Set<SelectionKey> selectedKeys = selector.selectedKeys();  
  11.          Iterator<SelectionKey> keyIterator = selectedKeys.iterator();  
  12.          while(keyIterator.hasNext()) {  
  13.              SelectionKey key = keyIterator.next();  
  14.              if(key.isAcceptable()) {  
  15.                  // a connection was accepted by a ServerSocketChannel.  
  16.              } else if (key.isConnectable()) {  
  17.                  // a connection was established with a remote server.  
  18.              } else if (key.isReadable()) {  
  19.                  // a channel is ready for reading  
  20.              } else if (key.isWritable()) {  
  21.                  // a channel is ready for writing  
  22.              }  
  23.              keyIterator.remove();  
  24.          }  
  25.       }  
  26.    }  

極簡即時通訊

本例子是是一個極為簡單的例子,很多地方都不完善,但是例子可以很好的說明selector的使用方法。

本例子包含服務端和用戶端兩個部分,其中服務端采用兩個selector,用來建立連接配接和資料的讀寫。兩個selector在兩個線程中。

服務端

[java]  view plain  copy

  1. public class ServerSocketChannelTest {  
  2.    private static final int SERVER_PORT = 8081;  
  3.    private ServerSocketChannel server;  
  4.    private volatile Boolean isStop = false;  
  5.    //負責建立連接配接的selector  
  6.    private Selector conn_Sel;  
  7.    //負責資料讀寫的selector  
  8.    private Selector read_Sel;  
  9. // private ExecutorService sendService = Executors.newFixedThreadPool(3);  
  10.    //鎖,用來在建立連接配接後,喚醒read_Sel時使用的同步  
  11.    private Object lock = new Object();  
  12.    //注冊的使用者  
  13.    private Map<String, ClientInfo> clents = new HashMap<String, ClientInfo>();  
  14.    public void init() throws IOException {  
  15.       //建立ServerSocketChannel  
  16.       server = ServerSocketChannel.open();  
  17.       //綁定端口  
  18.       server.socket().bind(new InetSocketAddress(SERVER_PORT));  
  19.       server.configureBlocking(false);  
  20.       //定義兩個selector  
  21.       conn_Sel = Selector.open();  
  22.       read_Sel = Selector.open();  
  23.       //把channel注冊到selector上,第二個參數為興趣的事件  
  24.       server.register(conn_Sel, SelectionKey.OP_ACCEPT);  
  25.    }  
  26.    // 負責建立連接配接。  
  27.    private void beginListen() {  
  28.       System.out.println("--------開始監聽----------");  
  29.       while (!isStop) {  
  30.          try {  
  31.             conn_Sel.select();  
  32.          } catch (IOException e) {  
  33.             e.printStackTrace();  
  34.             continue;  
  35.          }  
  36.          Iterator<SelectionKey> it = conn_Sel.selectedKeys().iterator();  
  37.          while (it.hasNext()) {  
  38.             SelectionKey con = it.next();  
  39.             it.remove();  
  40.             if (con.isAcceptable()) {  
  41.                 try {  
  42.                    SocketChannel newConn = ((ServerSocketChannel) con  
  43.                          .channel()).accept();  
  44.                    handdleNewInConn(newConn);  
  45.                 } catch (IOException e) {  
  46.                    e.printStackTrace();  
  47.                    continue;  
  48.                 }  
  49.             } else if (con.isReadable()) {//廢代碼,執行不到。  
  50.                 try {  
  51.                    handleData((SocketChannel) con.channel());  
  52.                 } catch (IOException e) {  
  53.                    e.printStackTrace();  
  54.                 }  
  55.             }  
  56.          }  
  57.       }  
  58.    }  
  59.    private void beginReceive(){  
  60.       System.out.println("---------begin receiver data-------");  
  61.       while (true) {  
  62.          synchronized (lock) {  
  63.          }  
  64.          try {  
  65.             read_Sel.select();  
  66.          } catch (IOException e) {  
  67.             e.printStackTrace();  
  68.             continue;  
  69.          }  
  70.          Iterator<SelectionKey> it = read_Sel.selectedKeys().iterator();  
  71.          while (it.hasNext()) {  
  72.             SelectionKey con = it.next();  
  73.             it.remove();  
  74.             if (con.isReadable()) {  
  75.                 try {  
  76.                    handleData((SocketChannel) con.channel());  
  77.                 } catch (IOException e) {  
  78.                    e.printStackTrace();  
  79.                 }  
  80.             }  
  81.          }  
  82.       }  
  83.    }  
  84.    private void handdleNewInConn(SocketChannel newConn) throws IOException {  
  85.       newConn.configureBlocking(false);  
  86.       //這裡必須先喚醒read_Sel,然後加鎖,防止讀寫線程的中select方法再次鎖定。  
  87.       synchronized (lock) {  
  88.          read_Sel.wakeup();  
  89.          newConn.register(read_Sel, SelectionKey.OP_READ);  
  90.       }  
  91.       //newConn.register(conn_Sel, SelectionKey.OP_READ);  
  92.    }  
  93.    private void handleData(final SocketChannel data) throws IOException {  
  94.       ByteBuffer buffer = ByteBuffer.allocate(512);  
  95.       try {  
  96.          int size= data.read(buffer);  
  97.          if (size==-1) {  
  98.             System.out.println("-------連接配接斷開-----");  
  99.             //這裡暫時不處理,這裡可以移除已經注冊的用戶端  
  100.          }  
  101.       } catch (IOException e) {  
  102.          e.printStackTrace();  
  103.          return;  
  104.       }  
  105.       buffer.flip();  
  106.       byte[] msgByte = new byte[buffer.limit()];  
  107.       buffer.get(msgByte);  
  108.       Message msg = Message.getMsg(new String(msgByte));  
  109.       //這裡讀完資料其實已經可以另開線程了下一步的處理,理想情況下,根據不同的消息類型,建立不同的隊列,把待發送的消息放進隊列  
  110.       //當然也可以持久化。如果在資料沒有讀取前,另開線程的話,讀寫線程中 read_Sel.select(),會立刻傳回。可以把  
  111.       if (msg.getType().equals("0")) {// 注冊  
  112.          ClientInfo info = new ClientInfo(msg.getFrom(), data);  
  113.          clents.put(info.getClentID(), info);  
  114.          System.out.println(msg.getFrom() + "注冊成功");  
  115.       } else {// 轉發  
  116.          System.out.println("收到"+msg.getFrom()+"發給"+msg.getTo()+"的消息");  
  117.          ClientInfo to = clents.get(msg.getTo());  
  118.          buffer.rewind();  
  119.          if (to != null) {  
  120.             SocketChannel sendChannel = to.getChannel();  
  121.             try {  
  122.                 while (buffer.hasRemaining()) {  
  123.                    sendChannel.write(buffer);  
  124.                 }  
  125.             } catch (Exception e) {  
  126.             }  
  127.             finally {  
  128.                 buffer.clear();  
  129.             }  
  130.          }  
  131.       }  
  132.    }  
  133.    public static void main(String[] args) throws IOException {  
  134.       final ServerSocketChannelTest a = new ServerSocketChannelTest();  
  135.       a.init();  
  136.       new Thread("receive..."){  
  137.          public void run() {  
  138.             a.beginReceive();  
  139.          };  
  140.       }.start();  
  141.       a.beginListen();  
  142.    }  
  143. }  

用戶端

[java]  view plain  copy

  1. public class Client {  
  2.    private String self;  
  3.    private String to;  
  4.     //通道管理器   
  5.     private Selector selector;   
  6.     private ByteBuffer writeBuffer = ByteBuffer.allocate(512);  
  7.    private SocketChannel channel;  
  8.    private Object lock = new Object();  
  9.    private volatile boolean isInit = false;  
  10.     public Client(String self, String to)  {  
  11.       super();  
  12.       this.self = self;  
  13.       this.to = to;  
  14.    }  
  15.     public void initClient(String ip,int port) throws IOException {   
  16.         // 獲得一個Socket通道   
  17.         channel = SocketChannel.open();   
  18.         // 設定通道為非阻塞   
  19.         channel.configureBlocking(false);   
  20.         // 獲得一個通道管理器   
  21.         this.selector = Selector.open();   
  22.         // 用戶端連接配接伺服器,其實方法執行并沒有實作連接配接,需要在listen()方法中調   
  23.         //用channel.finishConnect();才能完成連接配接   
  24.         channel.connect(new InetSocketAddress(ip,port));   
  25.         //将通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。   
  26.         channel.register(selector, SelectionKey.OP_CONNECT);   
  27.     }   
  28.     @SuppressWarnings("unchecked")   
  29.     public void listen() throws IOException {  
  30.         // 輪詢通路selector   
  31.         while (true) {   
  32.           synchronized (lock) {  
  33.          }  
  34.             selector.select();   
  35.             // 獲得selector中選中的項的疊代器   
  36.             Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();   
  37.             while (ite.hasNext()) {   
  38.                 SelectionKey key =  ite.next();   
  39.                 // 删除已選的key,以防重複處理   
  40.                 ite.remove();   
  41.                 // 連接配接事件發生   
  42.                 if (key.isConnectable()) {   
  43.                     SocketChannel channel = (SocketChannel) key   
  44.                             .channel();   
  45.                     // 如果正在連接配接,則完成連接配接   
  46.                     if(channel.isConnectionPending()){   
  47.                         channel.finishConnect();   
  48.                     }   
  49.                     // 設定成非阻塞   
  50.                     channel.configureBlocking(false);   
  51.                     //在和服務端連接配接成功之後,為了可以接收到服務端的資訊,需要給通道設定讀的權限。   
  52.                     channel.register(this.selector, SelectionKey.OP_READ);   
  53.                     isInit = true;  
  54.                     // 獲得了可讀的事件   
  55.                 } else if (key.isReadable()) {   
  56.                         read(key);   
  57.                 }  
  58.             }   
  59.         }   
  60.     }   
  61.     public void read(SelectionKey key) throws IOException{   
  62.       SocketChannel data = (SocketChannel) key.channel();  
  63.       ByteBuffer buffer = ByteBuffer.allocate(512) ;  
  64.       try {  
  65.          data.read(buffer );  
  66.       } catch (IOException e) {  
  67.          e.printStackTrace();  
  68.          data.close();  
  69.          return;  
  70.       }  
  71.       buffer.flip();  
  72.       byte[] msgByte = new byte[buffer.limit()];  
  73.       buffer.get(msgByte);  
  74.       Message msg = Message.getMsg(new String(msgByte));  
  75.       System.out.println("---收到消息--"+msg+" 來自 "+msg.getFrom());  
  76.     }   
  77.     private void sendMsg(String content){  
  78.        writeBuffer.put(content.getBytes());  
  79.        writeBuffer.flip();  
  80.           try {  
  81.               while (writeBuffer.hasRemaining()) {  
  82.             channel.write(writeBuffer);  
  83.               }  
  84.          } catch (IOException e) {  
  85.             // TODO Auto-generated catch block  
  86.             e.printStackTrace();  
  87.          }         
  88.        writeBuffer.clear();  
  89.     }  
  90.     public  void start() throws IOException {   
  91.         initClient("localhost",8081);   
  92.         new Thread("reading"){  
  93.           public void run() {  
  94.                 try {  
  95.                 listen();  
  96.             } catch (IOException e) {  
  97.                 e.printStackTrace();  
  98.             }   
  99.           };  
  100.         }.start();  
  101.         int time3  = 0;  
  102.         while(!isInit&&time3<3){  
  103.           try {  
  104.              Thread.sleep(1000);  
  105.           } catch (InterruptedException e) {  
  106.              e.printStackTrace();  
  107.           }  
  108.           time3 ++;  
  109.         }  
  110.         System.out.println("--------開始注冊------");  
  111.         Message re = new Message("", self, "");  
  112.         sendMsg(re.toString());  
  113.         try {  
  114.          Thread.sleep(200);  
  115.       } catch (InterruptedException e) {  
  116.          e.printStackTrace();  
  117.       }  
  118.         System.out.println("-----注冊成功----");  
  119.         String content ="";  
  120.         System.out.println("---- 請輸入要發送的消息,按回車發送,輸入 123 退出----------");  
  121.             Scanner s = new Scanner(System.in);  
  122.             while (!content.equals("123")&&s.hasNext()) {  
  123.             content = s.next();  
  124.                Message msg = new Message(content, self, to);  
  125.                msg.setType("1");  
  126.                sendMsg(msg.toString());  
  127.                if (content.equals("123")) {  
  128.                 break;  
  129.             }  
  130.              System.out.println("---發送成功---");  
  131.          }  
  132.             channel.close();  
  133.       }  
  134. }  

用戶端測試

[java]  view plain  copy

  1. public class TestClient1 {  
  2.    public static void main(String[] args) throws IOException {  
  3.       Client c1 =new Client("1", "2");  
  4.       c1.start();  
  5.    }  
  6. }  
  7. public class TestClient2 {  
  8.    public static void main(String[] args) throws IOException {  
  9.       Client c2 =new Client("2", "1");  
  10.       c2.start();  
  11.    }  
  12. }  

結束

本文的例子極為簡單,但是都經過測試。在編碼的過程中,遇到的問題主要有兩點:

1.     channel.register()方法阻塞

2.     使用線程池遇到問題。本文最後在服務端的讀寫線程中,沒有使用線程池,原因注釋說的比較明白,也說明了使用線程池的一種設想。