天天看点

Java NIO —— TCP套接字(ServerSocketChannel & SocketChannel)

package com.demo.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketChannelDemo {

    private static final int PORT = ;
    private static final int ACCEPT_INTERVAL = ;
    private static final int BUFF_SIZE = ;
    private static final int WAIT_CONNECT_INTERVAL = ;

    private static void openServerSocketChannel(boolean isBlocking) {
        try {
            // The new channel's socket is initially unbound; it must be bound to a specific address via one of its socket's bind methods before connections can be accepted.
            final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(isBlocking);
            while (true) {
                final SocketChannel socketChannel = serverSocketChannel.accept();
                // 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null
                if (!isBlocking && socketChannel == null) {
                    System.out.println("serverSocketChannel.accept() = null");
                    try {
                        Thread.sleep(ACCEPT_INTERVAL);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                System.out.println("serverSocketChannel.accept() != null");
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        // do something with socketChannel...
                        ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE);
                        byte[] buffer = new byte[BUFF_SIZE];
                        int len = -;
                        do {
                            try {
                                len = socketChannel.read(readBuffer);
                            } catch (IOException e) {
                                e.printStackTrace();
                                try {
                                    socketChannel.close();
                                } catch (IOException e1) {
                                    e1.printStackTrace();
                                }
                                break;
                            }
                            if (len != -) {
                                //切换读写模式,socketChannel.read是向ByteBuffer里面写,而get是将数据从ByteBuffer里读出来
                                readBuffer.flip();
                                //get也会改变position
                                readBuffer.get(buffer, , len);
                                //rewind会将position的位置设置为0
                                readBuffer.rewind();
                                try {
                                    socketChannel.write(readBuffer);
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                                //将当前指针移到数组首位,相当于清空数据
                                readBuffer.clear();
                                System.out.println("service recv : len=" + len + ", data=" + new String(buffer, , len));
                            }
                        } while (len != -);
                    }
                }).start();
                break;
            }
            try {
                Thread.sleep();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            serverSocketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void openSocketChannel(boolean isBlocking) {
        try {
            final SocketChannel socketChannel = SocketChannel.open();
            // true if a connection was established, false if this channel is in non-blocking mode and the connection operation is in progress
            boolean isEstablished = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));
            System.out.println("socketChannel.connect() = " + isEstablished);
            if (!isBlocking && !isEstablished) {
                while (!socketChannel.finishConnect()) {
                    // wait, or do something else...
                    System.out.println("socketChannel.finishConnect() = false");
                    try {
                        Thread.sleep(WAIT_CONNECT_INTERVAL);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("socketChannel.finishConnect() = true");
            }
            new Thread(new Runnable() {

                @Override
                public void run() {
                    int len = -;
                    do {
                        ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE);
                        byte[] buffer = new byte[BUFF_SIZE];
                        // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
                        try {
                            len = socketChannel.read(readBuffer);
                            if (len != -) {
                                readBuffer.flip();
                                readBuffer.get(buffer, , len);
                                readBuffer.compact();
                                System.out.println("client recv : len=" + len + ", data=" + new String(buffer, , len));
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            break;
                        }
                    } while (len != -);
                }
            }).start();
            ByteBuffer writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
            for (int i = ; i < ; i++) {
                String str = "(" + i + ")" + new Date().toString();
                byte[] buffer = str.getBytes();
                //将数组写入到ByteBuffer中
                writeBuffer.put(buffer);
                // 这里要交换读写模式
                writeBuffer.flip();
                //将数据从ByteBuffer中读出,写入到流中
                socketChannel.write(writeBuffer);
                // 压缩数据,即将数据向前移动已使用的长度, 如果没有这句会报 java.nio.BufferOverflowException
                writeBuffer.compact();
                System.out.println("client send : len=" + buffer.length + ", data=" + str);
            }
            try {
                Thread.sleep();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int nThreads = ;
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        final boolean isBlocking = false;
        executorService.execute(new Runnable() {

            @Override
            public void run() {
                openServerSocketChannel(isBlocking);
            }
        });
        executorService.execute(new Runnable() {

            @Override
            public void run() {
                openSocketChannel(isBlocking);
            }
        });
    }
}
           

“isBlocking = true”的情况

serverSocketChannel.accept() != null
socketChannel.connect() = true
client send : len=, data=()Mon Dec  :: CST 
client send : len=, data=()Mon Dec  :: CST 
client send : len=, data=()Mon Dec  :: CST 
client send : len=, data=()Mon Dec  :: CST 
client send : len=, data=()Mon Dec  :: CST 
service recv : len=, data=()Mon Dec  :: CST 
service recv : len=, data=()Mon Dec  :: CST ()Mon Dec  :: CST ()Mon Dec  :: CST ()Mon Dec  :: CST 
client recv : len=, data=()Mon Dec  :: CST 
client recv : len=, data=()Mon Dec  :: CST ()Mon Dec  :: CST ()Mon Dec  :: CST ()Mon Dec  :: CST 
java.nio.channels.AsynchronousCloseException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:)
    at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:)
    at java.lang.Thread.run(Thread.java:)
           

“isBlocking = false”的情况

socketChannel.connect() = true
serverSocketChannel.accept() != null
service recv : len=, data=(0)Mon Dec 05 09:26:39 CST 2016
client recv : len=, data=(0)Mon Dec 05 09:26:39 CST 2016
client send : len=, data=(0)Mon Dec 05 09:26:39 CST 2016
client send : len=, data=(1)Mon Dec 05 09:26:39 CST 2016
client recv : len=, data=(1)Mon Dec 05 09:26:39 CST 2016
service recv : len=, data=(1)Mon Dec 05 09:26:39 CST 2016
client send : len=, data=(2)Mon Dec 05 09:26:39 CST 2016
client recv : len=, data=(2)Mon Dec 05 09:26:39 CST 2016
service recv : len=, data=(2)Mon Dec 05 09:26:39 CST 2016
client send : len=, data=(3)Mon Dec 05 09:26:39 CST 2016
service recv : len=, data=(3)Mon Dec 05 09:26:39 CST 2016
client recv : len=, data=(3)Mon Dec 05 09:26:39 CST 2016
service recv : len=, data=(4)Mon Dec 05 09:26:39 CST 2016
client recv : len=, data=(4)Mon Dec 05 09:26:39 CST 2016
client send : len=, data=(4)Mon Dec 05 09:26:39 CST 2016
java.nio.channels.AsynchronousCloseException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:)
    at com.demo.test.SocketChannelDemo$run(SocketChannelDemo.java:)
    at java.lang.Thread.run(Thread.java:)
           

注意:并没有要求客户端和服务端必须同时阻塞或同时非阻塞,只是还有2种情况的打印这里省略了而已。