基礎
何為心跳
顧名思義, 所謂 心跳, 即在 TCP 長連接配接中, 用戶端和伺服器之間定期發送的一種特殊的資料包, 通知對方自己還線上, 以確定 TCP 連接配接的有效性.
為什麼需要心跳
因為網絡的不可靠性, 有可能在 TCP 保持長連接配接的過程中, 由于某些突發情況, 例如網線被拔出, 突然掉電等, 會造成伺服器和用戶端的連接配接中斷. 在這些突發情況下, 如果恰好伺服器和用戶端之間沒有互動的話, 那麼它們是不能在短時間内發現對方已經掉線的. 為了解決這個問題, 我們就需要引入 心跳 機制.
心跳機制的工作原理是: 在伺服器和用戶端之間一定時間内沒有資料互動時, 即處于 idle 狀态時, 用戶端或伺服器會發送一個特殊的資料包給對方, 當接收方收到這個資料封包後, 也立即發送一個特殊的資料封包, 回應發送方, 此即一個 PING-PONG 互動. 自然地, 當某一端收到心跳消息後, 就知道了對方仍然線上, 這就確定 TCP 連接配接的有效性.
如何實作心跳
我們可以通過兩種方式實作心跳機制:
- 使用 TCP 協定層面的 keepalive 機制.
- 在應用層上實作自定義的心跳機制.
雖然在 TCP 協定層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:
- 它不是 TCP 的标準協定, 并且是預設關閉的.
- TCP keepalive 機制依賴于作業系統的實作, 預設的 keepalive 心跳時間是 兩個小時, 并且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
- TCP keepalive 與 TCP 協定綁定, 是以如果需要更換為 UDP 協定時, keepalive 機制就失效了.
雖然使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量, 但是基于上面的幾點缺點, 一般的實踐中, 人們大多數都是選擇在應用層上實作自定義的心跳.
既然如此, 那麼我們就來大緻看看在在 Netty 中是怎麼實作心跳的吧. 在 Netty 中, 實作心跳機制的關鍵是 IdleStateHandler, 它可以對一個 Channel 的 讀/寫設定定時器, 當 Channel 在一定事件間隔内沒有資料互動時(即處于 idle 狀态), 就會觸發指定的事件.
使用 Netty 實作心跳
上面我們提到了, 在 Netty 中, 實作心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 我們來看看它的構造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
複制
執行個體化一個 IdleStateHandler 需要提供三個參數:
- readerIdleTimeSeconds, 讀逾時. 即當在指定的時間間隔内沒有從 Channel 讀取到資料時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds, 寫逾時. 即當在指定的時間間隔内沒有資料寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds, 讀/寫逾時. 即當在指定的時間間隔内沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
為了展示具體的 IdleStateHandler 實作的心跳機制, 下面我們來構造一個具體的EchoServer 的例子, 這個例子的行為如下:
- 在這個例子中, 用戶端和伺服器通過 TCP 長連接配接進行通信.
- TCP 通信的封包格式是:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
複制
- 用戶端每隔一個随機的時間後, 向伺服器發送消息, 伺服器收到消息後, 立即将收到的消息原封不動地回複給用戶端.
- 若用戶端在指定的時間間隔内沒有讀/寫操作, 則用戶端會自動向伺服器發送一個 PING 心跳, 伺服器收到 PING 心跳消息時, 需要回複一個 PONG 消息.
通用部分
根據上面定義的行為, 我們接下來實作心跳的通用部分 CustomHeartbeatHandler:
/**
* @author xiongyongshun
* @version 1.0
*/
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte CUSTOM_MSG = 3;
protected String name;
private int heartbeatCount = 0;
public CustomHeartbeatHandler(String name) {
this.name = name;
}
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
protected void sendPingMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PING_MSG);
context.writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
private void sendPongMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PONG_MSG);
context.channel().writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// IdleStateHandler 所産生的 IdleStateEvent 的處理邏輯.
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("---READER_IDLE---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("---WRITER_IDLE---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("---ALL_IDLE---");
}
}
複制
類 CustomHeartbeatHandler 負責心跳的發送和接收, 我們接下來詳細地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實作心跳的關鍵, 它會根據不同的 IO idle 類型來産生不同的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實作的.
我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實作:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
複制
在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不同, 而進行不同的處理. 例如如果是讀取資料 idle, 則
e.state() == READER_IDLE
, 是以就調用 handleReaderIdle 來處理它.
CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個方法目前隻有預設的實作, 它需要在子類中進行重寫, 現在我們暫時略過它們, 在具體的用戶端和伺服器的實作部分時再來看它們.
知道了這一點後, 我們接下來看看資料處理部分:
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
複制
在
CustomHeartbeatHandler.channelRead0
中, 我們首先根據封包協定:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
複制
來判斷目前的封包類型, 如果是 PING_MSG 則表示是伺服器收到用戶端的 PING 消息, 此時伺服器需要回複一個 PONG 消息, 其消息類型是 PONG_MSG.
扔封包類型是 PONG_MSG, 則表示是用戶端收到伺服器發送的 PONG 消息, 此時列印一個 log 即可.
用戶端部分
用戶端初始化
public class Client {
public static void main(String[] args) {
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
Random random = new Random(System.currentTimeMillis());
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
for (int i = 0; i < 10; i++) {
String content = "client msg " + i;
ByteBuf buf = ch.alloc().buffer();
buf.writeInt(5 + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
buf.writeBytes(content.getBytes());
ch.writeAndFlush(buf);
Thread.sleep(random.nextInt(20000));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
workGroup.shutdownGracefully();
}
}
}
複制
上面的代碼是 Netty 的用戶端端的初始化代碼, 使用過 Netty 的朋友對這個代碼應該不會陌生. 别的部分我們就不再贅述, 我們來看看
ChannelInitializer.initChannel
部分即可:
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
複制
我們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 我們為用戶端端設定了讀寫 idle 逾時, 時間間隔是5s, 即如果用戶端在間隔 5s 後都沒有收到伺服器的消息或向伺服器發送消息, 則産生 ALL_IDLE 事件.
接下來我們添加了 LengthFieldBasedFrameDecoder, 它是負責解析我們的 TCP 封包, 因為和本文的目的無關, 是以這裡不詳細展開.
最後一個 Handler 是 ClientHandler, 它繼承于 CustomHeartbeatHandler, 是我們處理業務邏輯部分.
用戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
public ClientHandler() {
super("client");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
}
複制
ClientHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實作 僅僅列印收到的消息.
第二個重寫的方法是 handleAllIdle. 我們在前面提到, 用戶端負責發送心跳的 PING 消息, 當用戶端産生一個 ALL_IDLE 事件後, 會導緻父類的 CustomHeartbeatHandler.userEventTriggered 調用, 而 userEventTriggered 中會根據 e.state() 來調用不同的方法, 是以最後調用的是 ClientHandler.handleAllIdle, 在這個方法中, 用戶端調用 sendPingMsg 向伺服器發送一個 PING 消息.
伺服器部分
伺服器初始化
public class Server {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(10, 0, 0));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ServerHandler());
}
});
Channel ch = bootstrap.bind(12345).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
複制
伺服器的初始化部分也沒有什麼好說的, 它也和用戶端的初始化一樣, 為 pipeline 添加了三個 Handler.
伺服器 Handler
public class ServerHandler extends CustomHeartbeatHandler {
public ServerHandler() {
super("server");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes() - 5];
ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
buf.skipBytes(5);
buf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
channelHandlerContext.write(responseBuf);
}
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
super.handleReaderIdle(ctx);
System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
ctx.close();
}
}
複制
ServerHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實作 EchoServer 的功能: 即收到用戶端的消息後, 立即原封不動地将消息回複給用戶端.
第二個重寫的方法是 handleReaderIdle, 因為伺服器僅僅對用戶端的讀 idle 感興趣, 是以隻重新了這個方法. 若伺服器在指定時間後沒有收到用戶端的消息, 則會觸發 READER_IDLE 消息, 進而會調用 handleReaderIdle 這個方法.
我們在前面提到, 用戶端負責發送心跳的 PING 消息, 并且伺服器的 READER_IDLE 的逾時時間是用戶端發送 PING 消息的間隔的兩倍, 是以當伺服器 READER_IDLE 觸發時, 就可以确定是用戶端已經掉線了, 是以伺服器直接關閉用戶端連接配接即可.
總結
- 使用 Netty 實作心跳機制的關鍵就是利用 IdleStateHandler 來産生對應的 idle 事件.
- 一般是用戶端負責發送心跳的 PING 消息, 是以用戶端注意關注 ALL_IDLE 事件, 在這個事件觸發後, 用戶端需要向伺服器發送 PING 消息, 告訴伺服器"我還存活着".
- 伺服器是接收用戶端的 PING 消息的, 是以伺服器關注的是 READER_IDLE 事件, 并且伺服器的 READER_IDLE 間隔需要比用戶端的 ALL_IDLE 事件間隔大(例如用戶端ALL_IDLE 是5s 沒有讀寫時觸發, 是以伺服器的 READER_IDLE 可以設定為10s)
- 當伺服器收到用戶端的 PING 消息時, 會發送一個 PONG 消息作為回複. 一個 PING-PONG 消息對就是一個心跳互動.
實作用戶端的斷線重連
public class Client {
private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
private Channel channel;
private Bootstrap bootstrap;
public static void main(String[] args) throws Exception {
Client client = new Client();
client.start();
client.sendData();
}
public void sendData() throws Exception {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10000; i++) {
if (channel != null && channel.isActive()) {
String content = "client msg " + i;
ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
buf.writeInt(5 + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
buf.writeBytes(content.getBytes());
channel.writeAndFlush(buf);
}
Thread.sleep(random.nextInt(20000));
}
}
public void start() {
try {
bootstrap = new Bootstrap();
bootstrap
.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler(Client.this));
}
});
doConnect();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void doConnect() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
System.out.println("Connect to server successfully!");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConnect();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
}
複制
上面的代碼中, 我們抽象出 doConnect 方法, 它負責用戶端和伺服器的 TCP 連接配接的建立, 并且當 TCP 連接配接失敗時, doConnect 會 通過
channel().eventLoop().schedule
來延時10s 後嘗試重新連接配接.
用戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
private Client client;
public ClientHandler(Client client) {
super("client");
this.client = client;
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
client.doConnect();
}
}
複制
斷線重連的關鍵一點是檢測連接配接是否已經斷開. 是以我們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當 TCP 連接配接斷開時, 會回調 channelInactive 方法, 是以我們在這個方法中調用 client.doConnect() 來進行重連。
作者:永順
https://urlify.cn/FjyQZf