基本概念
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取, 它就通知该进程. IO多路复用适用如下场合:
(1)当客户处理多个描述字时(一般是交互式输入和网络套接口), 必须使用I/O复用.
(2)当一个客户同时处理多个套接口时, 而这种情况是可能的, 但很少出现.
(3)如果一个TCP服务器既要处理监听套接口, 又要处理已连接套接口, 一般也要用到I/O复用.
(4)如果一个服务器即要处理TCP, 又要处理UDP, 一般要使用I/O复用.
(5)如果一个服务器要处理多个服务或多个协议, 一般要使用I/O复用.
与多进程和多线程技术相比, I/O多路复用技术的最大优势是系统开销小, 系统不必创建进程/线程, 也不必维护这些进程/线程, 从而大大减小了系统的开销.
Selector(选择器)
在 Java 中, Selector 这个类是 select/epoll/poll 的外包类, 在不同的平台上, 底层的实现可能有所不同, 但其基本原理是一样的, 其原理图如下所示:
所有的 Channel 都归 Selector 管理, 这些 channel 中只要有至少一个有IO动作, 就可以通过 Selector.select 方法检测到, 并且使用 selectedKeys 得到这些有 IO 的 channel, 然后对它们调用相应的IO操作.
我这里有一个服务端的例子:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class EpollServer {
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
//不设置阻塞队列
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 注册 channel,并且指定感兴趣的事件是 Accept
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuff = ByteBuffer.allocate(1024);
ByteBuffer writeBuff = ByteBuffer.allocate(128);
writeBuff.put("received".getBytes());
writeBuff.flip();
while (true) {
int nReady = selector.select();
Set keys = selector.selectedKeys();
Iterator it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
// 创建新的连接,并且把连接注册到selector上,而且,
// 声明这个channel只对读操作感兴趣。
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuff.clear();
socketChannel.read(readBuff);
readBuff.flip();
System.out.println("received : " + new String(readBuff.array()));
key.interestOps(SelectionKey.OP_WRITE);
}
else if (key.isWritable()) {
writeBuff.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuff);
key.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
这个例子的关键点:
创建一个 ServerSocketChannel, 和一个 Selector, 并且把这个 server channel 注册到 selector 上, 注册的时间指定, 这个 channel 所感觉兴趣的事件是 SelectionKey.OP_ACCEPT, 这个事件代表的是有客户端发起TCP连接请求.
使用 select 方法阻塞住线程, 当 select 返回的时候, 线程被唤醒. 再通过 selectedKeys 方法得到所有可用 channel 的集合.
遍历这个集合, 如果其中 channel 上有连接到达, 就接受新的连接, 然后把这个新的连接也注册到 selector 中去.
如果有 channel 是读, 那就把数据读出来, 并且把它感兴趣的事件改成写. 如果是写, 就把数据写出去, 并且把感兴趣的事件改成读.
Selector.open 在不同的系统里实现方式不同
sunOS 使用 DevPollSelectorProvider, Linux就会使用 EPollSelectorProvider, 而默认则使用 PollSelectorProvider
也就是说 selector.select() 用来阻塞线程, 直到一个或多个 channle 进行 io 操作. 比如 SelectionKey.OP_ACCEPT.
然后使用 selector.selectedKeys() 方法获取出, 这些通道.
那么 selector.select() 是怎么直到已经有 io 操作了呢?
原因是因为 poll
poll
# include
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
pollfd结构体定义如下:
struct pollfd {
int fd;
short events;
short revents;
};
每一个 pollfd 结构体指定了一个被监视的文件描述符, 可以传递多个结构体, 指示 poll() 监视多个文件描述符.
每个结构体的 events 域是监视该文件描述符的事件掩码, 由用户来设置这个域. revents 域是文件描述符的操作结果事件掩码, 内核在调用返回时设置这个域.
events 域中请求的任何事件都可能在 revents 域中返回. 事件如下:
值
描述
POLLIN
有数据可读
POLLRDNORM
有普通数据可读
POLLRDBAND
有优先数据可读
POLLPRI
有紧迫数据可读
POLLOUT
写数据不会导致阻塞
POLLWRNORM
写普通数据不会导致阻塞
POLLWRBAND
写优先数据不会导致阻塞
POLLMSGSIGPOLL
消息可用
POLLER
指定的文件描述符发生错误
POLLHUP
指定的文件描述符挂起事件
POLLNVAL
指定的文件描述符非法
说白了 poll() 可以监视多个文件描述符.
如果返回值是 3, 我们需要逐个去遍历出返回值是 3 的 socket, 然后在做对应操作.
epoll
poll 方法有一个非常大的缺陷. poll 函数的返回值是一个整数, 得到了这个返回值以后, 我们还是要逐个去检查, 比如说, 有一万个 socket 同时 poll, 返回值是3, 我们还是只能去遍历这一万个 socket, 看看它们是否有IO动作.
这就很低效了, 于是, 就有了 epoll 的改进, epoll可以直接通过“输出参数”(可以理解为C语言中的指针类型的参数), 一个 epoll_event 数组, 直接获得这三个 socket, 这就比较快了.