天天看点

《保姆式的详解》Reactor 模式和 Java NIO

作者:笨鸟先飞ftyjh
《保姆式的详解》Reactor 模式和 Java NIO

概述

本文先从基本的 Socket 编程模式说起,介绍了 Java 传统的同步阻塞 IO 网络编程的基本实现,以及存在的性能问题,从而引出 Reactor 设计模式,最后通过 Java NIO 给出单 Reactor 单线程的实现方案。

Socket 编程模式

Unix 有几个统一性的理念或象征,并塑造了它的 API 及由此形成的开发风格。其中最重要的一点应当是“一切皆文件”模型及在此基础上建立的管道概念。

在 Unix/Linux 环境下,网络中的进程通过 Socket 进行通信,Socket 本质上也是一种特殊的文件,可以按照“打开,读写,关闭”的模式来操作。Socket 编程的基本模式如图 1 所示:

《保姆式的详解》Reactor 模式和 Java NIO

图 1 Socket 编程模式

  1. 创建 socket:本质上就是创建一个文件,每个文件都有一个整型的文件描述符(fd)来指代这个文件;
  2. 绑定端口:一台服务器可以同时运行多个不同的应用,在 TCP/IP 协议下通过端口进行区分,因此接下来需要绑定端口,所有连接到该端口的请求都会被我们的服务处理;
  3. 监听端口:执行创建 socket 和bind之后,socket 还处于closed状态,不对外监听,需要调用listen方法,让 socket 进入被动监听状态;其 API 定义如下:
int listen(int sockfd, int backlog);
// 在TCP协议下,建立连接需要完成三次握手,当连接建立完成后会先放到一个连接队列,backlog就是指定这个队列的大小。           

4.接收请求:通过调用accept()从已完成连接的队列中拿到连接进行处理,如果没有连接则调用会被阻塞。

基于同步阻塞 IO 的 Java Socket 编程

下面介绍在 Java 语言下如何完成 Socket 通信。传统的 Java Socket 编程模式使用同步阻塞 IO,如图 2 所示。

《保姆式的详解》Reactor 模式和 Java NIO

图 2 经典的 Java Socket 编程模式

服务端通过new ServerSocket(端口号)完成了的工作,接着循环调用 accept() 方法获取客户端请求(如果没有新的请求,程序就会阻塞),并为每一个客户端请求创建一个处理线程,避免因为主线程正在处理请求而无法响应其他连接。具体实现代码如下:

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    public static void main(String[] args) {
        try {
            // 绑定并监听端口
            ServerSocket server = new ServerSocket(5566);
            Socket client;
            while (!Thread.interrupted()) {
                // 接受请求,没有请求会阻塞
                client = server.accept();
                new Thread(new Handler(client)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class Handler implements Runnable {
        final Socket client;

        public Handler(Socket client) {
            this.client = client;
        }

        @Override
        public void run() {
            try {
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));
                // 接收客户端发送的内容
                String line;
                PrintWriter writer = new PrintWriter(
                        new OutputStreamWriter(client.getOutputStream()));
                // 客户端连接未关闭前,readLine返回值不为null,没有数据时会阻塞
                while ((line = reader.readLine()) != null) {
                    writer.println("你输入的是:" + line);
                    writer.flush();

                    // 通过约定特定输入,结束通讯
                    if ("end".equals(line)) {
                        break;
                    }
                }
                writer.close();
                reader.close();
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}           

在本地启动服务端程序,通过 telnet 请求连接。

《保姆式的详解》Reactor 模式和 Java NIO

这个模式在并发请求量不高的情况下是完全没有问题的,也建议使用这种模式。但在在高并发环境下,由于线程本身需要耗费系统资源,其次多线程下需要频繁进行上下文切换也会消耗性能,再者连接建立后还需要等待客户端数据到来,因此并不是最有效的解决方案。问题的关键在于两点:

  1. 频繁的上下文切换;
  2. 一个连接作为一个整体进行处理,粒度太粗。连接建立后,还需要等待数据可读才能进行真正的处理。

基于事件驱动的 Reactor 模式

解决上述问题的方式就是分治法和事件驱动。即:

  1. 把一个连接的处理过程拆成多个小任务;
  2. 只有在任务等待的事件触发时才执行任务。

Reactor 模式的核心理念即是如此。Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分配(Dispatch)给某个进程。

论文《Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events》详细介绍了 Reactor 模式的设计和实现,下面先理解几个关键的组成部分的作用:

主要概念

Handles

Handle(注意和 handler 区分开),中文常见翻译为句柄。句柄的叫法比较抽象,这里引用知乎的一个回答帮助大家理解。

句柄的英文是 handle。在英文中,有操作、处理、控制之类的意义。作为一个名词时,是指某个中间媒介,通过这个中间媒介可控制、操作某样东西。

这样说有点抽象,举个例子。door handle 是指门把手,通过门把手可以去控制门,但 door handle 并非 door 本身,只是一个中间媒介。又比如 knife handle 是刀柄,通过刀柄可以使用刀。

跟 door handle 类似,我们可以用 file handle 去操作 file, 但 file handle 并非 file 本身。这个 file handle 就被翻译成文件句柄,同理还有各种资源句柄。

一个句柄代表操作系统的一个资源,例如文件。event 会在句柄上触发,所以监控 event 需要在句柄上等待。

Synchronous Event Demultiplexer

直译过来就是同步事件分用器,它的作用是阻塞等待上面提到的一系列句柄集合产生事件。为什么叫做分用器?因为它监控了多个句柄,当某一个事件发生了,它会返回对应的那个句柄。一个常见的 I/O 事件分用器就是 select 系统调用。

Initiation Dispatcher

初始分发器,提供接口用于注册事件处理器 Event Handler。当同步事件分发器检测有一个句柄上产生事件时,会通知初始分发器,初始分发器通过查询已注册的事件处理器找到对应的处理对象。

Event Handler

事件处理器抽象类,定义了事件的处理方法。

Concrete Event Handler

具体的事件处理实现。

工作流程

《保姆式的详解》Reactor 模式和 Java NIO

Java NIO 下实现 Reactor 模式

Doug Lea 在《Scalable IO in Java》中介绍 Reactor 模式时引入了三种角色:

  1. Reactor:负责响应 IO 事件,分发给对应的事件处理,相当于上文提到的 Initiation Dispatcher;
  2. Handler:事件处理器,与上文一致;
  3. Acceptor:特殊的事件处理器,专门处理connection事件。

单 Reactor 单线程的实现方案:

《保姆式的详解》Reactor 模式和 Java NIO

图片引用自李运华的《单服务器高性能模式:Reactor 与 Proactor》一文

在给出具体实现前,需要先介绍下 Java NIO 的基本用法,它支持面向缓冲的,基于通道的 I/O 操作方法,主要由以下几个核心部分组成:Channel、Buffer 和 Selector。

Channel 和 Buffer

一般翻译为通道和缓存,数据可以从 Channel 读到 Buffer 中,也可以从 Buffer 写到 Channel 中。

《保姆式的详解》Reactor 模式和 Java NIO

Selector

Selector 允许单线程处理多个 Channel。要使用 Selector,得向 Selector 注册 Channel,然后调用它的 select() 方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

代码示例

下面使用 Java NIO 实现单 Reactor 单线程。参考《Scalable IO in Java》的示例,需要实现三个角色,首先是 Reactor,负责事件监听和分发。事件监听使用 Selector 实现 IO 多路复用。从前面我们已经了解到,需要先创建 Channel,再向 Selector 注册。

Java NIO 中定义了四种事件类型,分别是 OP_CONNECT(客户端使用)、OP_ACCEPT(服务端使用)、OP_READ、OP_WRITE。

《保姆式的详解》Reactor 模式和 Java NIO

显然,Reactor 需要负责监听 OP_ACCEPT 事件。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

public class Reactor {
    final ServerSocketChannel serverSocket;
    final Selector selector;

    public Reactor(int port) throws IOException {
        // 创建Channel
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);

        // 绑定端口
        serverSocket.bind(new InetSocketAddress(port));

        // 创建Selector
        selector = Selector.open();
      
        // 向Selector注册Channel,通过Selector监听Channel上的 Accept事件
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    }
}           

当 Selector 监听到 OP_ACCEPT 事件后,如前面所说,连接建立完成后会先放到一个连接队列,还需要调用 accept() 从已完成连接的队列中拿到连接进行处理。因此引入 Acceptor 角色来专门处理。Acceptor 和 Handler 都是事件处理器,为了减少篇幅,这里用 Runnable 作为他们共同的抽象处理类。

Acceptor 为每个客户端连接绑定一个对应的处理类实例,后续其他事件都由对应的事件处理类处理。需要说明的是,虽然这里用 Runnable 作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()方法执行。

import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
        this.selector = selector;
        this.serverSocket = serverSocket;
    }
  
    @Override
    public void run() {
        try {
          	// 调用accept()从已完成连接的队列中拿到连接进行处理
            SocketChannel clientSocket = serverSocket.accept();
          
            if (clientSocket != null) {
                // 绑定连接对应的事件处理器实例
                new Handler(selector, clientSocket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}           

接下来需要打通 Reactor 和 Acceptor 两个角色的联系:当Selector监听到 OP_ACCEPT 事件后,把事件分发给 Acceptor 。在前面的基础上,我们在 Reactor 角色上加上事件分发逻辑:

  1. 轮询 select() 获取有事件就绪的通道;
  2. 获取所有就绪通道对应的 SelectedKey 集合;
  3. 依次分发处理 SelectedKey 实例;
  • 每个 SelectedKey 都附着了对应的处理器实例,获取实例;
  • 执行每个实例的处理方法。

ServerSocketChannel 注册到 Selector 后,生成对应的 SelectedKey 对象,在上面附着 Acceptor 实例。当 ServerSocketChannel 监听的事件 OP_ACCEPT 就绪时,Selector.select() 返回对应的 SelectedKey,从而能获取到 Acceptor 实例,进而由 Acceptor 接管连接处理事件。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;

public class Reactor {
    final ServerSocketChannel serverSocket;
    final Selector selector;

    public Reactor(int port) throws IOException {
        // 创建Channel
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);

        // 绑定端口
        serverSocket.bind(new InetSocketAddress(port));

        // 创建Selector
        selector = Selector.open();

        // 把Channel注册到Selector进行监听,监听连接的 Accept事件
        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        
      // 可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道
        selectionKey.attach(new Acceptor(selector, serverSocket));
    }

    public void run() {
        while (!Thread.interrupted()) {
            try {
                // 一直阻塞到某个注册的通道有事件就绪
                selector.select();

                // 每一个注册的通道都对应一个SelectionKey
                Set<SelectionKey> selected = selector.selectedKeys();
                for (SelectionKey selectionKey : selected) {
                    // 分发处理
                    dispatch(selectionKey);
                }
              
                // 移除避免重复处理
                selected.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        // 虽然这里用 Runnable 作为处理类的抽象类,但并没有当成线程来使用,而是当成普通的抽象类,直接调用run()方法执行
        Runnable r = (Runnable) selectionKey.attachment();
        if (r != null) {
            r.run();
        }
    }
}           

目前为止,我们通过在 Selector 上注册 OP_ACCEPT 事件打通了 Reactor 和 Acceptor,Acceptor 为每个连接绑定了对应的 Handler 实例。当读缓冲区有数据可读时,我们希望响应事件,并且由Handler 来处理。因此,仿照前面的做法:

  1. 需要在 Selector 上注册 OP_READ ;
  2. 在 Selector 返回的 SelectedKey 上附着 Handler 实例。
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable {
    final Selector selector;
    final SocketChannel socket;
    final SelectionKey sk;

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.selector = selector;
        this.socket = socket;
        this.socket.configureBlocking(false);

        // 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件
        sk = socket.register(selector, SelectionKey.OP_READ);

        // 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理
        sk.attach(this);
    }

    @Override
    public void run() {
    }
}           

接下来完善读事件和写事件的处理逻辑。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable {
    final Selector selector;
    final SocketChannel socket;
    final SelectionKey sk;

    ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
    ByteBuffer outputBuffer = ByteBuffer.allocate(1024);

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.selector = selector;
        this.socket = socket;
        this.socket.configureBlocking(false);

        // 把Channel注册到Selector进行监听,监听读缓冲区数据就绪事件
        sk = socket.register(selector, SelectionKey.OP_READ);

        // 把事件处理类本身附着到SelectionKey,方便识别通道和后续处理
        sk.attach(this);
    }

    @Override
    public void run() {
        if (sk.isReadable()) {
            read();
        } else if (sk.isWritable()) {
            write();
        }
    }

    public void read() {
        try {
            while (socket.read(inputBuffer) > 0) {
                inputBuffer.flip();
                while(inputBuffer.hasRemaining()){
                    System.out.print((char) inputBuffer.get());
                }
                inputBuffer.clear();

                // 注册监听写缓冲区空闲事件,与前面通过channel注册到selector的写法等价
                sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void write() {
        try {
            outputBuffer.put("YYDS-ABCD-EFG\n".getBytes());
            outputBuffer.flip();
            while (outputBuffer.hasRemaining()) {
                socket.write(outputBuffer);
            }
            outputBuffer.clear();
            // 取消监听缓冲区空闲事件
            sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}           

启动程序

import java.io.IOException;

public class Main {
    public static void main(String[] args) throws IOException {
        Reactor reactor = new Reactor(5566);
        reactor.run();
    }
}           

在本地启动服务端程序,通过 telnet 发起请求验证。

补充说明

水平触发和边缘触发

Java 的 NIO 属于水平触发,即条件触发,在使用 Java 的 NIO 编程的时候,在没有数据可以往外写的时候要取消写事件,在有数据往外写的时候再注册写事件。

水平触发(level-triggered,也被称为条件触发)LT: 只要满足条件,就触发一个事件(只要有数据没有被获取,内核就不断通知你)

边缘触发(edge-triggered)ET: 每当状态变化时,触发一个事件。

如果你觉得这篇文章对你有帮助 点赞关注,然后私信回复【888】即可获取Java进阶全套视频以及源码学习资料

《保姆式的详解》Reactor 模式和 Java NIO

继续阅读