深入浅出 NIO 网络编程
- JAVA NIO概述
- NIO 和 BIO的对比
- NIO的三大核心组件
-
- NIO的三大核心组件—Buffer缓冲区
-
- Buffer内存类型
- 堆外方式的使用场景和注意
- NIO的三大核心组件—Channel通道
-
- NIO Channel VS Java Stream
- Channel的实现
- SocketChannel
- ServerSocketChannel
- 模拟代码
- Selector选择器
-
- Selector监听事件
- Selector的使用
- NIO 与多线程结合改进方案
-
- reactor模式
- 总结
JAVA NIO概述
Java NIO是始于java1.4版本后提出的新的Java IO操作的
非阻塞
API,用于替换标准( 传统,或者 Blocking IO ,简称为 BIO ) Java IO API 的 IO API 。
NIO 和 BIO的对比
NIO | BIO |
---|---|
非阻塞IO | 阻塞IO |
基于缓冲区(Buffer) | 基于流(stream) |
拥有选择器Selector,是 NIO 实现非阻塞的基础 | 无选择器 |
NIO的三大核心组件
- Buffer缓冲区,本质上市一个可以写入的内存块(类似数组),可以再次读取,该内存块包含在NIO Buffer对象中,该对象提供了一系列的方法,用于操作使用内存块,后文会详细解读Buffer缓冲区的使用。
- Channel通道
- Selector选择器
NIO的三大核心组件—Buffer缓冲区
Buffer的基础属性:
capacity容量:buffer具有一定的固定的大小,也成为容量;
position位置:写入模式下代表写数据的位置,读模式下代表读取数据的位置;
limit限制:写入模式,限制大小等于buffer的容量大小,limit=capacity,读取模式下,limit等于写入的数据量的大小。
mark标记: 记录当前 position
从上述可知
position 属性代表位置,初始值为 0
。在写模式下,每 Buffer 中写入一个值,position 就加 1 ,代表下一次的写入位置。在读模式下,每从 Buffer 中读取一个值,position 就自动加 1 ,代表下一次的读取位置。
limit 属性代表上限限制
。在写模式下,代表最大能写入的数据上限位置,此时 limit 等于 capacity 。在读模式下,在 Buffer 完成所有数据写入后,通过调用 flip() 方法,切换到读模式。此时,limit 等于 Buffer 中实际的数据大小。因为 Buffer 不一定被写满,所以不能使用 capacity 作为实际的数据大小。
mark 属性为标记
,通过 mark() 方法,记录当前 position ;通过 reset() 方法,恢复 position 为标记。
写模式下,标记上一次写位置。
读模式下,标记上一次读位置。
使用Buffer进行数据读取和写入操作,步骤如下
1.将数据写入缓冲区
2.调用buffer.filp(),转换为读取模式
3.缓冲区读取数据
4.调用buffer.clear() 或buffer.compact()清除缓冲区
代码示例
package nio;
import java.nio.ByteBuffer;
/**
* @author 潇兮
* @date 2019/10/13 16:58
**/
public class BufferTest {
public static void main(String[] args){
//基于堆内( Non-Direct )内存的实现类 HeapByteBuffer 的对象,allocateDirect(int capacity)堆外内存
ByteBuffer byteBuffer=ByteBuffer.allocate(5);
//默认是写入模式
System.out.println(String.format("初始化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
//写入操作
System.out.println("开始读数据");
byteBuffer.put((byte) 1);
byteBuffer.put((byte) 2);
byteBuffer.put((byte) 3);
//写入数据后属性变化
System.out.println(String.format("写入数据后属性变化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
//转换为读模式,读取数据若是不掉用flip方法,position的位置不正确
byteBuffer.flip();
byte a=byteBuffer.get();
System.out.println(a);
byte b=byteBuffer.get();
System.out.println(b);
//读取数据后属性的变化
System.out.println(String.format("读取数据后属性的变化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));
}
}
Buffer内存类型
ByteBuffer提供了直接内存(direct堆外内存)和直接内存(heap堆)两种实现。
堆外内存获取的方式:ByteBuffer byteBuffer=ByteBuffer.allocateDirect(int capacity);
使用堆外内存的好处:
1.进行网络 IO 比heapBuffer少一次拷贝。(file/socket——OS memory——jvm heap)因为GC会移动对象内存,所以在写file或者socket时,JVM的实现会先把数据复制到堆外 ,再进行写入。
2.GC范围之外,降低GC压力,实现了自动管理。DirectByteBuffer中有一个Cleaner对象(PhantomReference),Cleaner被GC前会执行clean方法,触发DirectByteBuffer中定义的回收函数 Deallocator
堆外方式的使用场景和注意
1.在性能确实可观的情况下才去使用堆外内存的方式;例如分配给大型、长寿命的对象或应用(网络传输、文件读写场景),减少内存拷贝的次数;
2.通过虚拟机参数MaxDirectMemorySize限制大小,防止耗尽整个机器的内存
NIO的三大核心组件—Channel通道
Channel的API涵盖了UDP/TCP网络和文件 IO,例如FileChannel、DatagramChannel、SocketChannel
ServerSocketChannel。
NIO Channel VS Java Stream
1.对于同一个channel,我们可以在同一通道内读取和写入操作。而对于同一个stream中,要么只读,要么只写,二选一,即是单向操作的;
2.Channel可以非阻塞的读写 IO 操作,而stream只能阻塞的读写IO操作;
3.Channel的使用必须搭配Buffer,即是总是先读取到一个buffer中或向一个buffer中写入,再写入Channel。
Channel的实现
Channel 在 Java 中,作为一个接口(java.nio.channels.Channel ),定义了 IO 操作的连接与关闭。主要有代码如下
Channel 最为重要的四个 Channel 实现类如下:
:客户端用于发起 TCP 的 Channel 。
SocketChannel
:服务端用于监听新进来的连接的 TCP 的 Channel 。对于新进来的连接,都会创建一个对应的 SocketChannel 。
ServerSocketChannel
:通过 UDP 读写数据。
DatagramChannel
:从文件中,读写数据。
FileChannel
SocketChannel
SocketChannel用于建立TCP网络连接,类似java.net.Socket。有两种创建方式。参考《Java NIO 系列教程(八) SocketChannel》
- 客户端主动发起和服务器的连接
- 服务端获取新连接
//客户端主动发起连接
SocketChannel socketChannel=SocketChannel.open();
socketChannel.configureBlocking(false);//设置为非阻塞模式,默认是阻塞模式
socketChannel.connect(new InetSocketAddress("http://xxxx.com",80));
channel.write(byteBuffer);//发送请求数据——向通道内写入数据(循环中调用)
int bytesRead=socketChannel.read(byteBuffer);//读取服务端返回的数据——读取缓冲区的数据
socketChannel.close();//关闭连接
注意:write写:write()在还没写入任何内容的时候就有可能返回(非阻塞),所以需要循环中调用write().
read读:read()方法可能直接返回而读不到任何数据,根据返回的int值判断读取了多少字节。
ServerSocketChannel
ServerSocketChannel可以监听新建的TCP连接通道,类似ServerSocket。详细介绍可以参考《Java NIO系列教程(九) ServerSocketChannel》
//创建网络服务端
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//设置非阻塞
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while(true){
//获取新的TCP链接端口
SocketChannel socketChannel=serverSocketChannel.accept();
if(socketChannel !=null){
//tcp请求,读取响应
}
}
serverSocketChannel.accept():如果该通道处于非阻塞模式,那么如果没有挂起连接,该方法立即返回null,所以SocketChannel 要进行非null校验。
模拟代码
客户端代码:
package nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
/**
* @Author 作者 :@潇兮
* @Date 创建时间:2019/10/14 12:09
* 类说明:
*/
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 判断是否连接完成,若没连接上,则一直等待
Thread.yield();
}
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
// 发送内容
String msg = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 读取响应
System.out.println("收到服务端响应:");
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (requestBuffer.position() > 0) break;
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
服务端代码(一):
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* @Author 作者 :@潇兮
* @Date 创建时间:2019/10/14 11:06
* 类说明:用于初步理解,直接基于非阻塞的写法,后文继续待改进
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建网络服务端
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//设置阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口启动");
while (true){
SocketChannel socketChannel=serverSocketChannel.accept();//获取新连接
//判断socketChannel
if (socketChannel!=null){
System.out.println("获取的新连接:"+socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);//设置为非阻塞
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
try {
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) continue;//若无数据,不继续处理后续
requestBuffer.flip();//切换为读模式
byte[] content = new byte[requestBuffer.limit()];//初始化数组大小
requestBuffer.get(content);//获取内容
System.out.println("收到消息:"+new String(content));
System.out.println("收到数据,来自:"+ socketChannel.getRemoteAddress());
// 响应结果 200,模拟请求响应
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Yes,He is";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());//数据存放在byte数组
while (buffer.hasRemaining()) {
// hasRemaining() 返回是否有剩余的可用长度
socketChannel.write(buffer);// 非阻塞
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
从服务端代码(一)中,我们运行NIOServer,同时打开多个client,发现只有当一个client发送读取结束后第二个client才与Server建立连接。即是当58729端口完成发送信息后,58737端口的客户端才会与server建立连接。和 BIO 的代码实现区别不大,此处只是将方式改为非阻塞的形式,改进代码,见
服务端代码(改进一)
,服务端代码(一)运行结果如下图:
服务端代码(改进一)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
/**
* @Author 作者 :@潇兮
* @Date 创建时间:2019/10/14 11:54
* 类说明:直接基于非阻塞的写法,一个线程处理轮询所有请求, 问题: 轮询通道的方式,低效,浪费CPU。后文继续改进
*/
public class NIOServer1 {
//已经建立连接的集合
private static ArrayList<SocketChannel> channels = new ArrayList<>();
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口启动");
while (true){
SocketChannel socketChannel=serverSocketChannel.accept();
if (socketChannel!=null){
System.out.println("收到新连接 : " + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false); // 默认是阻塞的,一定要设置为非阻塞
channels.add(socketChannel);
}else {
//若无新连接,处理现有连接后删除
Iterator<SocketChannel> iterator = channels.iterator();
while (iterator.hasNext()){
SocketChannel ch=iterator.next();
try {
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
if (ch.read(byteBuffer)==0){
//若通道内无数据处理,退出当前循环
continue;
}
while (ch.isOpen() && ch.read(byteBuffer)!=-1){
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (byteBuffer.position() > 0) break;
}
if (byteBuffer.position()==0) continue; // 如果没数据了, 则不继续后面的处理
//切换为读模式
byteBuffer.flip();
byte[] content = new byte[byteBuffer.limit()];
byteBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到数据,来自:" + ch.getRemoteAddress());
// 响应结果 200,模拟响应
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
// hasRemaining() 返回是否有剩余的可用长度
ch.write(buffer);
}
//处理后删除
iterator.remove();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
}
运行server开启服务端后运行多个客户端client,发现通过
轮询通道
的方式可以一次建立多个连接,如同代码所示,当一个程序与服务端建立连接的时候,fu服务端收到响—accept(),收到响应后将它加入channels中,保留多个连接,当无新连接建立时,线程处理已经建立好的连接。但是存在不足,上述代码的实现方式是为低效的循环检查的方式,NIO提供了Selector的方式解决此类问题,避免循环检查,具体代码见
服务端代码(改进二)
,服务端代码(改进一)运行结果如下图:
Selector选择器
Selector是一个Java NIO 组件,可以检查一个或者多个NIO通道,确定一个或多个 NIO Channel 的状态是否处于可读、可写。实现了单个线程可以管理多个通道,从而管理多个网络连接。因此,Selector 也被称为多路复用器。
Selector监听事件
一个线程使用Selector监听多个channel的不同事件:四个事件分别对应SelectionKey四个常量
1.Connet连接:连接完成事件( TCP 连接 ),仅适用于客户端, SelectionKey.OP_CONNECT
2.Accept:接受新连接事件,仅适用于服务端,SelectionKey.OP_ACCEPT
3.Read读取:读事件,适用于两端,表示Buffer可读,SelectionKey.OP_READ
4.Write写入:读事件,适用于两端,表示Buffer可写,SelectionKey.OP_WRITE
Selector的使用
创建Selector可以通过
Selector selector = Selector.open();
来创建一个Selector对象。为了Selector能够管理Channel,将Channel注册到Selector中(一个 Channel 要注册到 Selector 中,那么该 Channel 必须是非阻塞,FileChannel是阻塞的,所以不能够注册)
channel.configureBlocking(false); //
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
若Selector 可以对 Channel 的多个事件感兴趣,要注册 Channel 的多个事件到 Selector 中时,可以使用或运算
|
来组合多个事件。示例代码如下:
channel.configureBlocking(false); //
SelectionKey key = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
服务端代码(改进二)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @Author 作者 :@潇兮
* @Date 创建时间:2019/10/14 15:12
* 类说明: 此处一个selector监听所有事件,一个线程处理所有请求事件. 会成为瓶颈! 要有多线程的运用,后文继续改进
*/
public class NIOServer2 {
public static void main(String[] args) throws IOException {
// 1. 创建网络服务端ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式
// 2. 构建一个Selector选择器,将channel注册上去
Selector selector = Selector.open();
// 将serverSocketChannel注册到selector
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
// 对serverSocketChannel上面的accept事件监听(serverSocketChannel只能支持accept操作)
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
// 3. 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("端口启动");
while (true){
//不轮询通道,改用轮询事件的方式.select方法有阻塞效果,直到有事件通知才会有返回
selector.select();
Set<SelectionKey> selectionKeys=selector.selectedKeys();
//遍历查询结果
Iterator<SelectionKey> iterator=selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
//监听读和写事件
if (key.isAcceptable()){
ServerSocketChannel server= (ServerSocketChannel) key.attachment();
//将通道注册到selector上
SocketChannel clientSocketChannel=server.accept();//mainReactor 轮询accept
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
System.out.println("收到新连接 : " + clientSocketChannel.getRemoteAddress());
}
if (key.isReadable()){
SocketChannel socketChannel = (SocketChannel) key.attachment();
try {
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (requestBuffer.position() > 0) break;
}
if(requestBuffer.position() == 0) continue; // 如果没数据了, 则不继续后面的处理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到数据,来自:" + socketChannel.getRemoteAddress());
// TODO 业务操作 数据库 接口调用等等
// 响应结果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
} catch (IOException e) {
// e.printStackTrace();
key.cancel(); // 取消事件订阅
}
}
}
selector.selectNow();
}
}
}
NIO 与多线程结合改进方案
reactor模式
reactor模式称之为响应器模式,常用于nio的网络通信框架。下图来源:《Scalable IO in Java》
单Reactor模式
概括上图:Reactor线程接收请求->分发给线程池处理请求
单Reactor模式,定义了两种线程,一种线程是Reator线程。Reator线程主要负责网络的数据接收(accept())以及网络连接的处理(比如TCP连接中接收的数据),接收的数据处理操作(如读数据、解析协议等)由单独的线程池执行。 实际上就是将底层的基础网络处理和应用层的逻辑处理做了分离,提高效率。
多Reactor模式
概括上图:mainReactor->分发给subReactor读写->具体业务逻辑分发给单独的线程池处理
多Reactor模式,是将Reactor分为了多种,将处理网络连接的交由mainReactor去做,数据的读取处理交由另外一个Reactor去做,其他的和单Reactor模式无较大区别。本质上就是在网络底层多了一次分发,将数据处理交由另外一个线程去做。
根据Reactor模型改进服务端代码如下:、
服务端代码(三)
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 潇兮
* @date 2019/10/15 21:26
**/
public class NIOServer3 {
/** 处理业务操作的线程 */
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封装了selector.select()等事件轮询的代码
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* Selector监听到有事件后,调用这个方法
*/
public abstract void handler(SelectableChannel channel) throws Exception;
private ReactorThread() throws IOException {
selector = Selector.open();
}
volatile boolean running = false;
@Override
public void run() {
// 轮询Selector事件
while (running) {
try {
// 执行队列中的任务
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 获取查询结果
Set<SelectionKey> selected = selector.selectedKeys();
// 遍历查询结果
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
// 被封装的查询结果
SelectionKey key = iter.next();
iter.remove();
int readyOps = key.readyOps();
// 关注 Read 和 Accept两个事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
if (!channel.isOpen()) {
key.cancel(); // 如果关闭了,就取消这个KEY的订阅
}
} catch (Exception ex) {
key.cancel(); // 如果有异常,就取消这个KEY的订阅
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 为什么register要以任务提交的形式,让reactor线程去处理?
// 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
// 而select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
private ServerSocketChannel serverSocketChannel;
// 1、创建多个线程 - accept处理reactor线程 (accept线程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2、创建多个线程 - io处理reactor线程 (I/O线程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* 初始化线程组
*/
private void newGroup() throws IOException {
// 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
// work线程只负责处理IO处理,不处理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
if (requestBuffer.position() > 0) break;
}
if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());
// TODO 业务操作 数据库、接口...
workPool.submit(() -> {
});
// 响应结果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
// 创建mainReactor线程, 只负责处理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 只做请求分发,不做具体的数据读取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到连接建立的通知之后,分发给I/O线程继续去读取数据
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化channel,并且绑定一个eventLoop线程
*
* @throws IOException IO异常
*/
private void initAndRegister() throws Exception {
// 1、 创建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 将serverSocketChannel注册到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* 绑定端口
*
* @throws IOException IO异常
*/
private void bind() throws IOException {
// 1、 正式绑定端口,对外服务
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("启动完成,端口8080");
}
public static void main(String[] args) throws Exception {
NIOServer3 nioServerV3 = new NIOServer3();
nioServerV3.newGroup(); // 1、 创建main和sub两组线程
nioServerV3.initAndRegister(); // 2、 创建serverSocketChannel,注册到mainReactor线程上的selector上
nioServerV3.bind(); // 3、 为serverSocketChannel绑定端口
}
}
附上参考链接Reactor模型理解【NIO系列】——Reactor模式
总结
Java NIO是一种新的Java IO操作的
非阻塞
API。要想提升性能,需要与多线程技术结合使用。由于网络编程的复杂性,在开源社区中涌现了多款对JDK NIO封装和增强的框架如netty、Mina等框架。