3. 元件
3.1 EventLoop
事件循環對象
EventLoop 本質是一個單線程執行器(同時維護了一個 Selector),裡面有 run 方法處理 Channel 上源源不斷的 io 事件。
它的繼承關系比較複雜
- 一條線是繼承自 j.u.c.ScheduledExecutorService 是以包含了線程池中所有的方法
- 另一條線是繼承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判斷一個線程是否屬于此 EventLoop
- 提供了 parent 方法來看看自己屬于哪個 EventLoopGroup
事件循環組
EventLoopGroup 是一組 EventLoop,Channel 一般會調用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop,後續這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的線程安全)
- 繼承自 netty 自己的 EventExecutorGroup
- 實作了 Iterable 接口提供周遊 EventLoop 的能力
- 另有 next 方法擷取集合中下一個 EventLoop
3.1.1 擷取 NioEventLoop
// 内部建立了兩個 EventLoop, 每個 EventLoop 維護一個線程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
輸出
[email protected]
io.netty.channel.Defau[email protected]
[email protected]
也可以使用 for 循環
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
System.out.println(eventLoop);
}
輸出
io.netty.channel.Defau[email protected]
[email protected]
3.1.2 NioEventLoop 處理 io 事件
伺服器端兩個 nio worker 勞工
package com.itcxc.netty.c3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
/**
* @author chenxc
* @date 2021/8/17 22:54
*/
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
// 細分1:boos 隻負責ServerSocketChannel上的accept事件 worker隻負責SocketChannel上的讀寫
.group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast( "handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg); //讓消息傳遞給下一個handler
}
});
}
}).bind(8080);
}
}
用戶端,啟動三次,分别修改發送字元串為 zhangsan(第一次),lisi(第二次),wangwu(第三次)
package com.itcxc.netty.c3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author chenxc
* @date 2021/8/17 23:01
*/
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//2.帶有Future,Promise的類型都是和異步方法配套使用
final ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//1.連接配接到伺服器
//異步非阻塞的, main線程發起了調用,由nio線程執行連接配接操作
.connect(new InetSocketAddress("localhost", 8080));
//2.1 方法一 使用sync方法同步處理結果
/*channelFuture.sync(); //阻塞目前線程,直到nio線程建立連接配接完畢
//将channelFuture.sync();注釋後 将無阻塞的向下執行,直接擷取到還沒建立連接配接的channel,導緻資料不知道發送到那裡去
final Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
log.debug("{}",channel);*/
//2.2 方法二 addListener(回調對象) 方法異步處理結果
//在nio線程建立好連接配接之後,會調用operationComplete方法
channelFuture.addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel();
channel.writeAndFlush("hello world");
log.debug("{}",channel);
});
}
}
最後輸出
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - zhangsan
19:49:25 [DEBUG] [nioEventLoopGroup-4-2] c.i.n.c.EventLoopServer - lisi
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - wangwu
可以看到兩個勞工輪流處理 channel,但勞工與 channel 之間進行了綁定
再增加兩個非 nio 勞工
//建立一個獨立的EventLoopGroup 不處理io事件 處理複雜的業務邏輯,防止因為處理太長,導緻處理io事件太慢
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// 細分1:boos 隻負責ServerSocketChannel上的accept事件 worker隻負責SocketChannel上的讀寫
.group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast( "handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg); //讓消息傳遞給下一個handler
}
});
ch.pipeline().addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
用戶端代碼不變,啟動三次,分别修改發送字元串為 zhangsan(第一次),lisi(第二次),wangwu(第三次)
輸出
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - zhangsan
19:49:25 [DEBUG] [defaultEventLoopGroup-2-1] c.i.n.c.EventLoopServer - zhangsan
19:49:25 [DEBUG] [nioEventLoopGroup-4-2] c.i.n.c.EventLoopServer - lisi
19:49:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.n.c.EventLoopServer - lisi
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - wangwu
19:49:25 [DEBUG] [defaultEventLoopGroup-2-1] c.i.n.c.EventLoopServer - wangwu
可以看到,nio 勞工和 非 nio 勞工也分别綁定了 channel
3.1.3 NioEventLoop 處理普通任務
NioEventLoop 除了可以處理 io 事件,同樣可以向它送出普通任務
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
log.debug("normal task...");
});
輸出
22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...
可以用來執行耗時較長的任務
3.1.4 NioEventLoop 處理定時任務
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
輸出
22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
...
可以用來執行定時任務
3.1.5 💡 優雅關閉
優雅關閉
shutdownGracefully
方法。該方法會首先切換
EventLoopGroup
到關閉狀态進而拒絕新的任務的加入,然後在任務隊列的任務都處理完成後,停止線程的運作。進而確定整體應用是在正常有序的狀态下退出的
用戶端代碼:
package com.itcxc.netty.c3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
/**
* @author chenxc
* @date 2021/8/20 22:51
*/
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup group = new NioEventLoopGroup();
final ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
final Channel channel = channelFuture.sync().channel();
log.debug("{}",channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true){
final String line = scanner.nextLine();
if ("q".equals(line)){
channel.close(); //close是異步的
//log.debug("處理關閉之後的操作"); //不能在這裡善後
break;
}else {
channel.writeAndFlush(line);
}
}
},"input").start();
//擷取cloneFuture對象,
final ChannelFuture closeFuture = channel.closeFuture();
log.debug("waiting clone");
//1)同步關閉處理
/*closeFuture.sync();
log.debug("處理關閉之後的操作");*/
// 2)異步關閉處理
closeFuture.addListener((ChannelFutureListener) future -> {
log.debug("處理關閉之後的操作");
//優雅的關閉group裡面的線程,不再接受新的任務,然後再将該處理的處理完
group.shutdownGracefully();
});
}
}
3.1.6 💡 netty中的handler 執行中如何換人?
關鍵代碼
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一個 handler 的事件循環是否與目前的事件循環是同一個線程
EventExecutor executor = next.executor();
// 是,直接調用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要執行的代碼作為任務送出給下一個事件循環處理(換人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果兩個 handler 綁定的是同一個線程,那麼就直接調用
- 否則,把要調用的代碼封裝為一個任務對象,由下一個 handler 的線程來調用