我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码:
<code>01</code>
<code>public</code> <code>class</code> <code>TimeServer {</code>
<code>02</code>
<code>03</code>
<code> </code><code>/**</code>
<code>04</code>
<code> </code><code>* @param args</code>
<code>05</code>
<code> </code><code>* @throws IOException</code>
<code>06</code>
<code> </code><code>*/</code>
<code>07</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)</code><code>throws</code> <code>IOException {</code>
<code>08</code>
<code> </code><code>int</code> <code>port =</code><code>8080</code><code>;</code>
<code>09</code>
<code> </code><code>if</code> <code>(args !=</code><code>null</code> <code>&amp;&amp; args.length &gt;</code><code>0</code><code>) {</code>
<code>10</code>
<code> </code><code>try</code> <code>{</code>
<code>11</code>
<code> </code><code>port = Integer.valueOf(args[</code><code>0</code><code>]);</code>
<code>12</code>
<code> </code><code>}</code><code>catch</code> <code>(NumberFormatException e) {</code>
<code>13</code>
<code> </code><code>// 采用默认值</code>
<code>14</code>
<code> </code><code>}</code>
<code>15</code>
<code> </code><code>}</code>
<code>16</code>
<code> </code><code>MultiplexerTimeServer timeServer =</code><code>new</code> <code>MultiplexerTimeServer(port);</code>
<code>17</code>
<code> </code><code>New Thread(timeServer, &quot;NIO-MultiplexerTimeServer-</code><code>001</code><code>&quot;).start();</code>
<code>18</code>
<code>19</code>
<code>}</code>
我们对NIO创建的TimeServer进行下简单分析,8-15行跟之前的一样,设置监听端口。16-17行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入,现在我们继续看MultiplexerTimeServer的源码:
<code>001</code>
<code>public</code> <code>class</code> <code>MultiplexerTimeServer</code><code>implements</code> <code>Runnable {</code>
<code>002</code>
<code>003</code>
<code> </code><code>private</code> <code>Selector selector;</code>
<code>004</code>
<code>005</code>
<code> </code><code>private</code> <code>ServerSocketChannel servChannel;</code>
<code>006</code>
<code>007</code>
<code> </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>stop;</code>
<code>008</code>
<code>009</code>
<code>010</code>
<code> </code><code>* 初始化多路复用器、绑定监听端口</code>
<code>011</code>
<code> </code><code>*</code>
<code>012</code>
<code> </code><code>* @param port</code>
<code>013</code>
<code>014</code>
<code> </code><code>public</code> <code>MultiplexerTimeServer(</code><code>int</code> <code>port) {</code>
<code>015</code>
<code> </code><code>try</code> <code>{</code>
<code>016</code>
<code> </code><code>selector = Selector.open();</code>
<code>017</code>
<code> </code><code>servChannel = ServerSocketChannel.open();</code>
<code>018</code>
<code> </code><code>servChannel.configureBlocking(</code><code>false</code><code>);</code>
<code>019</code>
<code> </code><code>servChannel.socket().bind(</code><code>new</code> <code>InetSocketAddress(port),</code><code>1024</code><code>);</code>
<code>020</code>
<code> </code><code>servChannel.register(selector, SelectionKey.OP_ACCEPT);</code>
<code>021</code>
<code> </code><code>System.out.println(&quot;The time server is start in port : &quot; + port);</code>
<code>022</code>
<code> </code><code>}</code><code>catch</code> <code>(IOException e) {</code>
<code>023</code>
<code> </code><code>e.printStackTrace();</code>
<code>024</code>
<code> </code><code>System.exit(</code><code>1</code><code>);</code>
<code>025</code>
<code>026</code>
<code>027</code>
<code>028</code>
<code> </code><code>public</code> <code>void</code> <code>stop() {</code>
<code>029</code>
<code> </code><code>this</code><code>.stop =</code><code>true</code><code>;</code>
<code>030</code>
<code>031</code>
<code>032</code>
<code> </code><code>/*</code>
<code>033</code>
<code> </code><code>* (non-Javadoc)</code>
<code>034</code>
<code>035</code>
<code> </code><code>* @see java.lang.Runnable#run()</code>
<code>036</code>
<code>037</code>
<code> </code><code>@Override</code>
<code>038</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code>039</code>
<code> </code><code>while</code> <code>(!stop) {</code>
<code>040</code>
<code>041</code>
<code> </code><code>selector.select(</code><code>1000</code><code>);</code>
<code>042</code>
<code> </code><code>Set&lt;SelectionKey&gt; selectedKeys = selector.selectedKeys();</code>
<code>043</code>
<code> </code><code>Iterator&lt;SelectionKey&gt; it = selectedKeys.iterator();</code>
<code>044</code>
<code> </code><code>SelectionKey key =</code><code>null</code><code>;</code>
<code>045</code>
<code> </code><code>while</code> <code>(it.hasNext()) {</code>
<code>046</code>
<code> </code><code>key = it.next();</code>
<code>047</code>
<code> </code><code>it.remove();</code>
<code>048</code>
<code> </code><code>try</code> <code>{</code>
<code>049</code>
<code> </code><code>handleInput(key);</code>
<code>050</code>
<code> </code><code>}</code><code>catch</code> <code>(Exception e) {</code>
<code>051</code>
<code> </code><code>if</code> <code>(key !=</code><code>null</code><code>) {</code>
<code>052</code>
<code> </code><code>key.cancel();</code>
<code>053</code>
<code> </code><code>if</code> <code>(key.channel() !=</code><code>null</code><code>)</code>
<code>054</code>
<code> </code><code>key.channel().close();</code>
<code>055</code>
<code> </code><code>}</code>
<code>056</code>
<code>057</code>
<code>058</code>
<code> </code><code>}</code><code>catch</code> <code>(Throwable t) {</code>
<code>059</code>
<code> </code><code>t.printStackTrace();</code>
<code>060</code>
<code>061</code>
<code>062</code>
<code>063</code>
<code> </code><code>// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源</code>
<code>064</code>
<code> </code><code>if</code> <code>(selector !=</code><code>null</code><code>)</code>
<code>065</code>
<code>066</code>
<code> </code><code>selector.close();</code>
<code>067</code>
<code> </code><code>}</code><code>catch</code> <code>(IOException e) {</code>
<code>068</code>
<code>069</code>
<code>070</code>
<code>071</code>
<code>072</code>
<code> </code><code>private</code> <code>void</code> <code>handleInput(SelectionKey key)</code><code>throws</code> <code>IOException {</code>
<code>073</code>
<code>074</code>
<code> </code><code>if</code> <code>(key.isValid()) {</code>
<code>075</code>
<code> </code><code>// 处理新接入的请求消息</code>
<code>076</code>
<code> </code><code>if</code> <code>(key.isAcceptable()) {</code>
<code>077</code>
<code> </code><code>// Accept the new connection</code>
<code>078</code>
<code> </code><code>ServerSocketChannel ssc = (ServerSocketChannel) key.channel();</code>
<code>079</code>
<code> </code><code>SocketChannel sc = ssc.accept();</code>
<code>080</code>
<code> </code><code>sc.configureBlocking(</code><code>false</code><code>);</code>
<code>081</code>
<code> </code><code>// Add the new connection to the selector</code>
<code>082</code>
<code> </code><code>sc.register(selector, SelectionKey.OP_READ);</code>
<code>083</code>
<code>084</code>
<code> </code><code>if</code> <code>(key.isReadable()) {</code>
<code>085</code>
<code> </code><code>// Read the data</code>
<code>086</code>
<code> </code><code>SocketChannel sc = (SocketChannel) key.channel();</code>
<code>087</code>
<code> </code><code>ByteBuffer readBuffer = ByteBuffer.allocate(</code><code>1024</code><code>);</code>
<code>088</code>
<code> </code><code>int</code> <code>readBytes = sc.read(readBuffer);</code>
<code>089</code>
<code> </code><code>if</code> <code>(readBytes &gt;</code><code>0</code><code>) {</code>
<code>090</code>
<code> </code><code>readBuffer.flip();</code>
<code>091</code>
<code> </code><code>byte</code><code>[] bytes =</code><code>new</code> <code>byte</code><code>[readBuffer.remaining()];</code>
<code>092</code>
<code> </code><code>readBuffer.get(bytes);</code>
<code>093</code>
<code> </code><code>String body =</code><code>new</code> <code>String(bytes, &quot;UTF-</code><code>8</code><code>&quot;);</code>
<code>094</code>
<code> </code><code>System.out.println(&quot;The time server receive order : &quot;</code>
<code>095</code>
<code> </code><code>+ body);</code>
<code>096</code>
<code> </code><code>String currentTime = &quot;QUERY TIME ORDER&quot;</code>
<code>097</code>
<code> </code><code>.equalsIgnoreCase(body) ?</code><code>new</code> <code>java.util.Date(</code>
<code>098</code>
<code> </code><code>System.currentTimeMillis()).toString()</code>
<code>099</code>
<code> </code><code>: &quot;BAD ORDER&quot;;</code>
<code>100</code>
<code> </code><code>doWrite(sc, currentTime);</code>
<code>101</code>
<code> </code><code>}</code><code>else</code> <code>if</code> <code>(readBytes &lt;</code><code>0</code><code>) {</code>
<code>102</code>
<code> </code><code>// 对端链路关闭</code>
<code>103</code>
<code> </code><code>key.cancel();</code>
<code>104</code>
<code> </code><code>sc.close();</code>
<code>105</code>
<code> </code><code>}</code><code>else</code>
<code>106</code>
<code> </code><code>;</code><code>// 读到0字节,忽略</code>
<code>107</code>
<code>108</code>
<code>109</code>
<code>110</code>
<code>111</code>
<code> </code><code>private</code> <code>void</code> <code>doWrite(SocketChannel channel, String response)</code>
<code>112</code>
<code> </code><code>throws</code> <code>IOException {</code>
<code>113</code>
<code> </code><code>if</code> <code>(response !=</code><code>null</code> <code>&amp;&amp; response.trim().length() &gt;</code><code>0</code><code>) {</code>
<code>114</code>
<code> </code><code>byte</code><code>[] bytes = response.getBytes();</code>
<code>115</code>
<code> </code><code>ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);</code>
<code>116</code>
<code> </code><code>writeBuffer.put(bytes);</code>
<code>117</code>
<code> </code><code>writeBuffer.flip();</code>
<code>118</code>
<code> </code><code>channel.write(writeBuffer);</code>
<code>119</code>
<code>120</code>
<code>121</code>
由于这个类相比于传统的Socket编程稍微复杂一些,在此我们进行详细分析,我们从如下几个关键步骤讲解多路复用处理类:
14-26行为构造方法,在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置,例如将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。系统资源初始化成功后将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败,例如端口被占用则退出
39-61行在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1S,无论是否有读写等事件发生,selector每隔1S都被唤醒一次,selector也提供了一个无参的select方法。当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,我们通过对就绪状态的Channel集合进行迭代,就可以进行网络的异步读写操作
76-83行处理新接入的客户端请求消息,根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等,作为入门的例子,例程没有进行额外的参数设置
84-109行用于读取客户端的请求消息,首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流,注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。使用返回值进行判断,看读取到的字节数,返回值有三种可能的结果:
1) 返回值大于0:读到了字节,对字节进行编解码;
2) 返回值等于0:没有读取到字节,属于正常场景,忽略;
3) 返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。
当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。然后根据缓冲区可读的字节个数创建字节数组,调用ByteBuffer的get操作将缓冲区可读的字节数组拷贝到新创建的字节数组中,最后调用字符串的构造函数创建请求消息体并打印。如果请求指令是”QUERY TIME ORDER”则把服务器的当前时间编码后返回给客户端,下面我们看看如果异步发送应答消息给客户端。
111-119行将应答消息异步发送给客户端,我们看下关键代码,首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,调用ByteBuffer的put操作将字节数组拷贝到缓冲区中,然后对缓冲区进行flip操作,最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。
使用NIO创建TimeServer服务器完成之后,我们继续学习如何创建NIO客户端。首先还是通过时序图了解关键步骤和过程,然后结合代码进行详细分析。