天天看点

Java NIO深入理解ServerSocketChannelJava NIO 简介Java NIO组件

Java NIO 简介

JAVA NIO有两种解释:一种叫非阻塞IO(Non-blocking I/O),另一种也叫新的IO(New I/O),其实是同一个概念。它是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。

NIO是一种基于通道和缓冲区的I/O方式,它可以使用Native函数库直接分配堆外内存(区别于JVM的运行时数据区),然后通过一个存储在java堆里面的DirectByteBuffer对象作为这块内存的直接引用进行操作。这样能在一些场景显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。

Java NIO组件

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。传统IO是基于字节流和字符流进行操作(基于流),而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

Buffer

Buffer(缓冲区)是一个用于存储特定基本类型数据的容器。除了boolean外,其余每种基本类型都有一个对应的buffer类。Buffer类的子类有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer 。

Channel

Channel(通道)表示到实体,如硬件设备、文件、网络套接字或可以执行一个或多个不同 I/O 操作(如读取或写入)的程序组件的开放的连接。Channel接口的常用实现类有FileChannel(对应文件IO)、DatagramChannel(对应UDP)、SocketChannel和ServerSocketChannel(对应TCP的客户端和服务器端)。Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

Selector

Selector(选择器)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。即用选择器,借助单一线程,就可对数量庞大的活动I/O通道实施监控和维护。

Java NIO的简单实现

public class Demo1 {

    private static Integer port = 8080;

    // 通道管理器(Selector)
    private static Selector selector;

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS, new LinkedTransferQueue<>(), new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        try {
            // 创建通道ServerSocketChannel
            ServerSocketChannel open = ServerSocketChannel.open();

            // 将通道设置为非阻塞
            open.configureBlocking(false);

            // 绑定到指定的端口上
            open.bind(new InetSocketAddress(port));

            // 通道管理器(Selector)
            selector = Selector.open();

            /**
             * 将通道(Channel)注册到通道管理器(Selector),并为该通道注册selectionKey.OP_ACCEPT事件
             * 注册该事件后,当事件到达的时候,selector.select()会返回,
             * 如果事件没有到达selector.select()会一直阻塞。
             */
            open.register(selector, SelectionKey.OP_ACCEPT);

            // 循环处理
            while (true) {
                /**
                 * 当注册事件到达时,方法返回,否则该方法会一直阻塞
                 * 该Selector的select()方法将会返回大于0的整数,该整数值就表示该Selector上有多少个Channel具有可用的IO操作
                 */
                int select = selector.select();
                System.out.println("当前有 " + select + " 个channel可以操作");

                // 一个SelectionKey对应一个就绪的通道
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    // 获取事件
                    SelectionKey key = iterator.next();

                    // 移除事件,避免重复处理
                    iterator.remove();

                    // 客户端请求连接事件,接受客户端连接就绪
                    if (key.isAcceptable()) {
                        accept(key);
                    } else if (key.isReadable()) {
                        // 监听到读事件,对读事件进行处理
                        threadPoolExecutor.submit(new NioServerHandler(key));
                    }
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理客户端连接成功事件
     *
     * @param key
     */
    public static void accept(SelectionKey key) {
        try {
            // 获取客户端连接通道
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);

            // 给通道设置读事件,客户端监听到读事件后,进行读取操作
            sc.register(selector, SelectionKey.OP_READ);
            System.out.println("accept a client : " + sc.socket().getInetAddress().getHostName());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听到读事件,读取客户端发送过来的消息
     */
    public static class NioServerHandler implements Runnable {

        private SelectionKey selectionKey;

        public NioServerHandler(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
        }

        @Override
        public void run() {
            try {
                if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

                    // 从通道读取数据到缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);

                    // 输出客户端发送过来的消息
                    socketChannel.read(buffer);
                    buffer.flip();
                    System.out.println("收到客户端" + socketChannel.socket().getInetAddress().getHostName() + "的数据:" + new String(buffer.array()));

                    //将数据添加到key中
                    ByteBuffer outBuffer = ByteBuffer.wrap(buffer.array());

                    // 将消息回送给客户端
                    socketChannel.write(outBuffer);
                    selectionKey.cancel();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
           

原文地址

https://www.51csdn.cn/article/404.html