天天看点

《Netty 权威指南》—— NIO创建的TimeServer源码分析

我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码:

<code>01</code>

<code>public</code> <code>class</code> <code>TimeServer {</code>

<code>02</code>

<code>03</code>

<code>    </code><code>/**</code>

<code>04</code>

<code>     </code><code>* @param args</code>

<code>05</code>

<code>     </code><code>* @throws IOException</code>

<code>06</code>

<code>     </code><code>*/</code>

<code>07</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args)</code><code>throws</code> <code>IOException {</code>

<code>08</code>

<code>    </code><code>int</code> <code>port =</code><code>8080</code><code>;</code>

<code>09</code>

<code>    </code><code>if</code> <code>(args !=</code><code>null</code> <code>&amp;amp;&amp;amp; args.length &amp;gt;</code><code>0</code><code>) {</code>

<code>10</code>

<code>        </code><code>try</code> <code>{</code>

<code>11</code>

<code>        </code><code>port = Integer.valueOf(args[</code><code>0</code><code>]);</code>

<code>12</code>

<code>        </code><code>}</code><code>catch</code> <code>(NumberFormatException e) {</code>

<code>13</code>

<code>        </code><code>// 采用默认值</code>

<code>14</code>

<code>        </code><code>}</code>

<code>15</code>

<code>    </code><code>}</code>

<code>16</code>

<code>    </code><code>MultiplexerTimeServer timeServer =</code><code>new</code> <code>MultiplexerTimeServer(port);</code>

<code>17</code>

<code>    </code><code>New Thread(timeServer, &amp;quot;NIO-MultiplexerTimeServer-</code><code>001</code><code>&amp;quot;).start();</code>

<code>18</code>

<code>19</code>

<code>}</code>

我们对NIO创建的TimeServer进行下简单分析,8-15行跟之前的一样,设置监听端口。16-17行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入,现在我们继续看MultiplexerTimeServer的源码:

<code>001</code>

<code>public</code> <code>class</code> <code>MultiplexerTimeServer</code><code>implements</code> <code>Runnable {</code>

<code>002</code>

<code>003</code>

<code>    </code><code>private</code> <code>Selector selector;</code>

<code>004</code>

<code>005</code>

<code>    </code><code>private</code> <code>ServerSocketChannel servChannel;</code>

<code>006</code>

<code>007</code>

<code>    </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>stop;</code>

<code>008</code>

<code>009</code>

<code>010</code>

<code>     </code><code>* 初始化多路复用器、绑定监听端口</code>

<code>011</code>

<code>     </code><code>*</code>

<code>012</code>

<code>     </code><code>* @param port</code>

<code>013</code>

<code>014</code>

<code>    </code><code>public</code> <code>MultiplexerTimeServer(</code><code>int</code> <code>port) {</code>

<code>015</code>

<code>    </code><code>try</code> <code>{</code>

<code>016</code>

<code>        </code><code>selector = Selector.open();</code>

<code>017</code>

<code>        </code><code>servChannel = ServerSocketChannel.open();</code>

<code>018</code>

<code>        </code><code>servChannel.configureBlocking(</code><code>false</code><code>);</code>

<code>019</code>

<code>        </code><code>servChannel.socket().bind(</code><code>new</code> <code>InetSocketAddress(port),</code><code>1024</code><code>);</code>

<code>020</code>

<code>        </code><code>servChannel.register(selector, SelectionKey.OP_ACCEPT);</code>

<code>021</code>

<code>        </code><code>System.out.println(&amp;quot;The time server is start in port : &amp;quot; + port);</code>

<code>022</code>

<code>    </code><code>}</code><code>catch</code> <code>(IOException e) {</code>

<code>023</code>

<code>        </code><code>e.printStackTrace();</code>

<code>024</code>

<code>        </code><code>System.exit(</code><code>1</code><code>);</code>

<code>025</code>

<code>026</code>

<code>027</code>

<code>028</code>

<code>    </code><code>public</code> <code>void</code> <code>stop() {</code>

<code>029</code>

<code>    </code><code>this</code><code>.stop =</code><code>true</code><code>;</code>

<code>030</code>

<code>031</code>

<code>032</code>

<code>    </code><code>/*</code>

<code>033</code>

<code>     </code><code>* (non-Javadoc)</code>

<code>034</code>

<code>035</code>

<code>     </code><code>* @see java.lang.Runnable#run()</code>

<code>036</code>

<code>037</code>

<code>    </code><code>@Override</code>

<code>038</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>039</code>

<code>    </code><code>while</code> <code>(!stop) {</code>

<code>040</code>

<code>041</code>

<code>        </code><code>selector.select(</code><code>1000</code><code>);</code>

<code>042</code>

<code>        </code><code>Set&amp;lt;SelectionKey&amp;gt; selectedKeys = selector.selectedKeys();</code>

<code>043</code>

<code>        </code><code>Iterator&amp;lt;SelectionKey&amp;gt; it = selectedKeys.iterator();</code>

<code>044</code>

<code>        </code><code>SelectionKey key =</code><code>null</code><code>;</code>

<code>045</code>

<code>        </code><code>while</code> <code>(it.hasNext()) {</code>

<code>046</code>

<code>            </code><code>key = it.next();</code>

<code>047</code>

<code>            </code><code>it.remove();</code>

<code>048</code>

<code>            </code><code>try</code> <code>{</code>

<code>049</code>

<code>            </code><code>handleInput(key);</code>

<code>050</code>

<code>            </code><code>}</code><code>catch</code> <code>(Exception e) {</code>

<code>051</code>

<code>            </code><code>if</code> <code>(key !=</code><code>null</code><code>) {</code>

<code>052</code>

<code>                </code><code>key.cancel();</code>

<code>053</code>

<code>                </code><code>if</code> <code>(key.channel() !=</code><code>null</code><code>)</code>

<code>054</code>

<code>                </code><code>key.channel().close();</code>

<code>055</code>

<code>            </code><code>}</code>

<code>056</code>

<code>057</code>

<code>058</code>

<code>        </code><code>}</code><code>catch</code> <code>(Throwable t) {</code>

<code>059</code>

<code>        </code><code>t.printStackTrace();</code>

<code>060</code>

<code>061</code>

<code>062</code>

<code>063</code>

<code>    </code><code>// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源</code>

<code>064</code>

<code>    </code><code>if</code> <code>(selector !=</code><code>null</code><code>)</code>

<code>065</code>

<code>066</code>

<code>        </code><code>selector.close();</code>

<code>067</code>

<code>        </code><code>}</code><code>catch</code> <code>(IOException e) {</code>

<code>068</code>

<code>069</code>

<code>070</code>

<code>071</code>

<code>072</code>

<code>    </code><code>private</code> <code>void</code> <code>handleInput(SelectionKey key)</code><code>throws</code> <code>IOException {</code>

<code>073</code>

<code>074</code>

<code>    </code><code>if</code> <code>(key.isValid()) {</code>

<code>075</code>

<code>        </code><code>// 处理新接入的请求消息</code>

<code>076</code>

<code>        </code><code>if</code> <code>(key.isAcceptable()) {</code>

<code>077</code>

<code>        </code><code>// Accept the new connection</code>

<code>078</code>

<code>        </code><code>ServerSocketChannel ssc = (ServerSocketChannel) key.channel();</code>

<code>079</code>

<code>        </code><code>SocketChannel sc = ssc.accept();</code>

<code>080</code>

<code>        </code><code>sc.configureBlocking(</code><code>false</code><code>);</code>

<code>081</code>

<code>        </code><code>// Add the new connection to the selector</code>

<code>082</code>

<code>        </code><code>sc.register(selector, SelectionKey.OP_READ);</code>

<code>083</code>

<code>084</code>

<code>        </code><code>if</code> <code>(key.isReadable()) {</code>

<code>085</code>

<code>        </code><code>// Read the data</code>

<code>086</code>

<code>        </code><code>SocketChannel sc = (SocketChannel) key.channel();</code>

<code>087</code>

<code>        </code><code>ByteBuffer readBuffer = ByteBuffer.allocate(</code><code>1024</code><code>);</code>

<code>088</code>

<code>        </code><code>int</code> <code>readBytes = sc.read(readBuffer);</code>

<code>089</code>

<code>        </code><code>if</code> <code>(readBytes &amp;gt;</code><code>0</code><code>) {</code>

<code>090</code>

<code>            </code><code>readBuffer.flip();</code>

<code>091</code>

<code>            </code><code>byte</code><code>[] bytes =</code><code>new</code> <code>byte</code><code>[readBuffer.remaining()];</code>

<code>092</code>

<code>            </code><code>readBuffer.get(bytes);</code>

<code>093</code>

<code>            </code><code>String body =</code><code>new</code> <code>String(bytes, &amp;quot;UTF-</code><code>8</code><code>&amp;quot;);</code>

<code>094</code>

<code>            </code><code>System.out.println(&amp;quot;The time server receive order : &amp;quot;</code>

<code>095</code>

<code>                </code><code>+ body);</code>

<code>096</code>

<code>            </code><code>String currentTime = &amp;quot;QUERY TIME ORDER&amp;quot;</code>

<code>097</code>

<code>                </code><code>.equalsIgnoreCase(body) ?</code><code>new</code> <code>java.util.Date(</code>

<code>098</code>

<code>                </code><code>System.currentTimeMillis()).toString()</code>

<code>099</code>

<code>                </code><code>: &amp;quot;BAD ORDER&amp;quot;;</code>

<code>100</code>

<code>            </code><code>doWrite(sc, currentTime);</code>

<code>101</code>

<code>        </code><code>}</code><code>else</code> <code>if</code> <code>(readBytes &amp;lt;</code><code>0</code><code>) {</code>

<code>102</code>

<code>            </code><code>// 对端链路关闭</code>

<code>103</code>

<code>            </code><code>key.cancel();</code>

<code>104</code>

<code>            </code><code>sc.close();</code>

<code>105</code>

<code>        </code><code>}</code><code>else</code>

<code>106</code>

<code>            </code><code>;</code><code>// 读到0字节,忽略</code>

<code>107</code>

<code>108</code>

<code>109</code>

<code>110</code>

<code>111</code>

<code>    </code><code>private</code> <code>void</code> <code>doWrite(SocketChannel channel, String response)</code>

<code>112</code>

<code>        </code><code>throws</code> <code>IOException {</code>

<code>113</code>

<code>    </code><code>if</code> <code>(response !=</code><code>null</code> <code>&amp;amp;&amp;amp; response.trim().length() &amp;gt;</code><code>0</code><code>) {</code>

<code>114</code>

<code>        </code><code>byte</code><code>[] bytes = response.getBytes();</code>

<code>115</code>

<code>        </code><code>ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);</code>

<code>116</code>

<code>        </code><code>writeBuffer.put(bytes);</code>

<code>117</code>

<code>        </code><code>writeBuffer.flip();</code>

<code>118</code>

<code>        </code><code>channel.write(writeBuffer);</code>

<code>119</code>

<code>120</code>

<code>121</code>

由于这个类相比于传统的Socket编程稍微复杂一些,在此我们进行详细分析,我们从如下几个关键步骤讲解多路复用处理类:

14-26行为构造方法,在构造方法中进行资源初始化,创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置,例如将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024。系统资源初始化成功后将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位;如果资源初始化失败,例如端口被占用则退出

39-61行在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1S,无论是否有读写等事件发生,selector每隔1S都被唤醒一次,selector也提供了一个无参的select方法。当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合,我们通过对就绪状态的Channel集合进行迭代,就可以进行网络的异步读写操作

76-83行处理新接入的客户端请求消息,根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。注意,我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等,作为入门的例子,例程没有进行额外的参数设置

84-109行用于读取客户端的请求消息,首先创建一个ByteBuffer,由于我们事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1M的缓冲区。然后调用SocketChannel的read方法读取请求码流,注意,由于我们已经将SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的。使用返回值进行判断,看读取到的字节数,返回值有三种可能的结果:

1)      返回值大于0:读到了字节,对字节进行编解码;

2)      返回值等于0:没有读取到字节,属于正常场景,忽略;

3)      返回值为-1:链路已经关闭,需要关闭SocketChannel,释放资源。

当读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,它的作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。然后根据缓冲区可读的字节个数创建字节数组,调用ByteBuffer的get操作将缓冲区可读的字节数组拷贝到新创建的字节数组中,最后调用字符串的构造函数创建请求消息体并打印。如果请求指令是”QUERY TIME ORDER”则把服务器的当前时间编码后返回给客户端,下面我们看看如果异步发送应答消息给客户端。

111-119行将应答消息异步发送给客户端,我们看下关键代码,首先将字符串编码成字节数组,根据字节数组的容量创建ByteBuffer,调用ByteBuffer的put操作将字节数组拷贝到缓冲区中,然后对缓冲区进行flip操作,最后调用SocketChannel的write方法将缓冲区中的字节数组发送出去。需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题,我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。

使用NIO创建TimeServer服务器完成之后,我们继续学习如何创建NIO客户端。首先还是通过时序图了解关键步骤和过程,然后结合代码进行详细分析。 

继续阅读