自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:
1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。
2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。
3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。
TimeClient:1 package nio;
2
3 public class TimeClient {
4 public static void main(String args[]){
5 int port = 8080;
6 if(args != null && args.length > 0){
7 try{
8 port = Integer.valueOf(args[0]);
9 }catch(NumberFormatException e){
10 //采用默认值
11 }
12 }
13 new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start();
14 }
15 }
TimeClientHandler:
1 package nio;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.ByteBuffer;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.Selector;
8 import java.nio.channels.SocketChannel;
9 import java.util.Iterator;
10 import java.util.Set;
11
12 public class TimeClientHandle implements Runnable{
13 private String host;
14 private int port;
15 private Selector selector;
16 private SocketChannel socketChannel;
17 private volatile boolean stop;
18
19 public TimeClientHandle(String host,int port){
20 this.host = host == null ? "127.0.0.1" : host;
21 this.port = port;
22 try{
23 selector = Selector.open();
24 socketChannel = SocketChannel.open();
25 socketChannel.configureBlocking(false);
26 }catch(IOException e){
27 e.printStackTrace();
28 System.exit(1);
29 }
30 }
31
32
33 public void run() {
34 //发送请求连接
35 try{
36 doConnect();
37 }catch(IOException e){
38 e.printStackTrace();
39 System.exit(1);
40 }
41 while(!stop){
42 try{
43 selector.select(1000);
44 Set<SelectionKey> selectedKeys = selector.selectedKeys();
45 Iterator<SelectionKey> it = selectedKeys.iterator();
46 SelectionKey key = null;
47 //当有就绪的Channel时,执行handleInput(key)方法
48 while(it.hasNext()){
49 key = it.next();
50 it.remove();
51 try{
52 handleInput(key);
53 }catch(Exception e){
54 if(key != null){
55 key.cancel();
56 if(key.channel() != null){
57 key.channel().close();
58 }
59 }
60 }
61 }
62 }catch(Exception e){
63 e.printStackTrace();
64 System.exit(1);
65 }
66 }
67
68 //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
69 if(selector != null){
70 try{
71 selector.close();
72 }catch(IOException e){
73 e.printStackTrace();
74 }
75 }
76
77 }
78
79
80 private void handleInput(SelectionKey key) throws IOException{
81 if(key.isValid()){
82 SocketChannel sc = (SocketChannel)key.channel();
83 //判断是否连接成功
84 if(key.isConnectable()){
85 if(sc.finishConnect()){
86 sc.register(selector, SelectionKey.OP_READ);
87 }else{
88 System.exit(1);
89 }
90 }
91
92 if(key.isReadable()){
93 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
94 int readBytes = sc.read(readBuffer);
95 if(readBytes > 0){
96 readBuffer.flip();
97 byte[] bytes = new byte[readBuffer.remaining()];
98 readBuffer.get(bytes);
99 String body = new String(bytes,"UTF-8");
100 System.out.println("Now is :" + body);
101 this.stop = true;
102 }else if(readBytes < 0){
103 //对端链路关闭
104 key.cancel();
105 sc.close();
106 }else{
107 ; //读到0字节,忽略
108 }
109 }
110 }
111 }
112
113 private void doConnect() throws IOException{
114 //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
115 if(socketChannel.connect(new InetSocketAddress(host,port))){
116 socketChannel.register(selector, SelectionKey.OP_READ);
117 doWrite(socketChannel);
118 }else{
119 //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态
120 socketChannel.register(selector, SelectionKey.OP_CONNECT);
121 }
122 }
123
124 private void doWrite(SocketChannel sc) throws IOException{
125 byte[] req = "QUERY TIME ORDER".getBytes();
126 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
127 writeBuffer.put(req);
128 writeBuffer.flip();
129 sc.write(writeBuffer);
130 if(!writeBuffer.hasRemaining()){
131 System.out.println("Send order 2 server succeed.");
132 }
133 }
134
135 }
TimeServer:
1 package nio;
2
3 import java.io.IOException;
4
5 public class TimeServer {
6
7 public static void main(String[] args) throws IOException{
8 int port = 8080;
9 if(args != null && args.length >0){
10 try{
11 port = Integer.valueOf(args[0]);
12 }catch(NumberFormatException e){
13 //采用默认值
14 }
15 }
16 //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。
17 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
18 new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
19 }
20 }
MultiplexerTimeServer:
1 package nio;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.nio.ByteBuffer;
6 import java.nio.channels.SelectionKey;
7 import java.nio.channels.Selector;
8 import java.nio.channels.ServerSocketChannel;
9 import java.nio.channels.SocketChannel;
10 import java.util.Iterator;
11 import java.util.Set;
12
13 public class MultiplexerTimeServer implements Runnable {
14
15 private Selector selector;
16
17 private ServerSocketChannel servChannel;
18
19 private volatile boolean stop;
20
21 public MultiplexerTimeServer(int port){
22 try{
23
24 selector = Selector.open();
25 servChannel.configureBlocking(false);
26 //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024
27 servChannel.socket().bind(new InetSocketAddress(port),1024);
28 //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出
29 servChannel.register(selector,SelectionKey.OP_ACCEPT);
30 System.out.println("The time server is start in port:" + port);
31 }catch(IOException e){
32 e.printStackTrace();
33 System.exit(1);
34 }
35 }
36
37 public void stop(){
38 this.stop = true;
39 }
40
41 public void run() {
42 while(!stop){
43 try{
44 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合
45 selector.select(1000);
46 Set<SelectionKey> selectedKeys = selector.selectedKeys();
47 Iterator<SelectionKey> it = selectedKeys.iterator();
48 SelectionKey key = null;
49 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作
50 while(it.hasNext()){
51 key = it.next();
52 it.remove();
53 try{
54 handleInput(key);
55 }catch(Exception e){
56 if(key != null){
57 key.cancel();
58 if(key.channel() != null){
59 key.channel().close();
60 }
61 }
62 }
63 }
64 }catch(Throwable t){
65 t.printStackTrace();
66 }
67 }
68
69 //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
70 if(selector != null){
71 try{
72 selector.close();
73 }catch(IOException e){
74 e.printStackTrace();
75 }
76 }
77 }
78
79 //处理新接入的请求消息
80 private void handleInput(SelectionKey key) throws IOException{
81 if(key.isValid()){
82
83 //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于
84 //完成了TCP的三次握手,TCP物理链路正式建立
85 if(key.isAcceptable()){
86 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
87 SocketChannel sc = ssc.accept();
88 sc.configureBlocking(false);
89 //Add the new connection tothe selector
90 sc.register(selector, SelectionKey.OP_READ);
91 }
92
93 if(key.isReadable()){
94 //Read the data
95
96 SocketChannel sc = (SocketChannel)key.channel();
97 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
98 int readBytes = sc.read(readBuffer);
99 if(readBytes > 0){
100 //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
101 readBuffer.flip();
102 byte[] bytes = new byte[readBuffer.remaining()];
103 readBuffer.get(bytes);
104 String body = new String(bytes,"UTF-8");
105 System.out.println("The time server receive order: + body");
106 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
107 doWrite(sc,currentTime);
108 }else if(readBytes < 0){
109 //对端链路关闭
110 key.cancel();
111 sc.close();
112 }else{
113 ; //读到0字节,忽略
114 }
115 }
116 }
117 }
118
119 private void doWrite(SocketChannel channel,String response) throws IOException{
120 if(response != null && response.trim().length() >0){
121 byte[] bytes = response.getBytes();
122 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
123 writeBuffer.put(bytes);
124 writeBuffer.flip();
125 channel.write(writeBuffer);
126 }
127 }
128 }