天天看點

這樣講 Netty 中的心跳機制,還有誰不會?

基礎

何為心跳

顧名思義, 所謂 心跳, 即在 TCP 長連接配接中, 用戶端和伺服器之間定期發送的一種特殊的資料包, 通知對方自己還線上, 以確定 TCP 連接配接的有效性.

為什麼需要心跳

因為網絡的不可靠性, 有可能在 TCP 保持長連接配接的過程中, 由于某些突發情況, 例如網線被拔出, 突然掉電等, 會造成伺服器和用戶端的連接配接中斷. 在這些突發情況下, 如果恰好伺服器和用戶端之間沒有互動的話, 那麼它們是不能在短時間内發現對方已經掉線的. 為了解決這個問題, 我們就需要引入 心跳 機制.

心跳機制的工作原理是: 在伺服器和用戶端之間一定時間内沒有資料互動時, 即處于 idle 狀态時, 用戶端或伺服器會發送一個特殊的資料包給對方, 當接收方收到這個資料封包後, 也立即發送一個特殊的資料封包, 回應發送方, 此即一個 PING-PONG 互動. 自然地, 當某一端收到心跳消息後, 就知道了對方仍然線上, 這就確定 TCP 連接配接的有效性.

如何實作心跳

我們可以通過兩種方式實作心跳機制:

  • 使用 TCP 協定層面的 keepalive 機制.
  • 在應用層上實作自定義的心跳機制.

雖然在 TCP 協定層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:

  1. 它不是 TCP 的标準協定, 并且是預設關閉的.
  2. TCP keepalive 機制依賴于作業系統的實作, 預設的 keepalive 心跳時間是 兩個小時, 并且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
  3. TCP keepalive 與 TCP 協定綁定, 是以如果需要更換為 UDP 協定時, keepalive 機制就失效了.

雖然使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量, 但是基于上面的幾點缺點, 一般的實踐中, 人們大多數都是選擇在應用層上實作自定義的心跳.

既然如此, 那麼我們就來大緻看看在在 Netty 中是怎麼實作心跳的吧. 在 Netty 中, 實作心跳機制的關鍵是 IdleStateHandler, 它可以對一個 Channel 的 讀/寫設定定時器, 當 Channel 在一定事件間隔内沒有資料互動時(即處于 idle 狀态), 就會觸發指定的事件.

使用 Netty 實作心跳

上面我們提到了, 在 Netty 中, 實作心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 我們來看看它的構造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
           

複制

執行個體化一個 IdleStateHandler 需要提供三個參數:

  • readerIdleTimeSeconds, 讀逾時. 即當在指定的時間間隔内沒有從 Channel 讀取到資料時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 寫逾時. 即當在指定的時間間隔内沒有資料寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 讀/寫逾時. 即當在指定的時間間隔内沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

為了展示具體的 IdleStateHandler 實作的心跳機制, 下面我們來構造一個具體的EchoServer 的例子, 這個例子的行為如下:

  1. 在這個例子中, 用戶端和伺服器通過 TCP 長連接配接進行通信.
  2. TCP 通信的封包格式是:
+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+
           

複制

  1. 用戶端每隔一個随機的時間後, 向伺服器發送消息, 伺服器收到消息後, 立即将收到的消息原封不動地回複給用戶端.
  2. 若用戶端在指定的時間間隔内沒有讀/寫操作, 則用戶端會自動向伺服器發送一個 PING 心跳, 伺服器收到 PING 心跳消息時, 需要回複一個 PONG 消息.

通用部分

根據上面定義的行為, 我們接下來實作心跳的通用部分 CustomHeartbeatHandler:

/**
 * @author xiongyongshun
 * @version 1.0
 */
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    protected String name;
    private int heartbeatCount = 0;

    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    protected void sendPingMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PING_MSG);
        context.writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    private void sendPongMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PONG_MSG);
        context.channel().writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所産生的 IdleStateEvent 的處理邏輯.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }

    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }

    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }

    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
}
           

複制

類 CustomHeartbeatHandler 負責心跳的發送和接收, 我們接下來詳細地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實作心跳的關鍵, 它會根據不同的 IO idle 類型來産生不同的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實作的.

我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實作:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent e = (IdleStateEvent) evt;
        switch (e.state()) {
            case READER_IDLE:
                handleReaderIdle(ctx);
                break;
            case WRITER_IDLE:
                handleWriterIdle(ctx);
                break;
            case ALL_IDLE:
                handleAllIdle(ctx);
                break;
            default:
                break;
        }
    }
}
           

複制

在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不同, 而進行不同的處理. 例如如果是讀取資料 idle, 則

e.state() == READER_IDLE

, 是以就調用 handleReaderIdle 來處理它.

CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個方法目前隻有預設的實作, 它需要在子類中進行重寫, 現在我們暫時略過它們, 在具體的用戶端和伺服器的實作部分時再來看它們.

知道了這一點後, 我們接下來看看資料處理部分:

@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
    if (byteBuf.getByte(4) == PING_MSG) {
        sendPongMsg(context);
    } else if (byteBuf.getByte(4) == PONG_MSG){
        System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
    } else {
        handleData(context, byteBuf);
    }
}
           

複制

CustomHeartbeatHandler.channelRead0

中, 我們首先根據封包協定:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+
           

複制

來判斷目前的封包類型, 如果是 PING_MSG 則表示是伺服器收到用戶端的 PING 消息, 此時伺服器需要回複一個 PONG 消息, 其消息類型是 PONG_MSG.

扔封包類型是 PONG_MSG, 則表示是用戶端收到伺服器發送的 PONG 消息, 此時列印一個 log 即可.

用戶端部分

用戶端初始化

public class Client {
    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        Random random = new Random(System.currentTimeMillis());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler());
                        }
                    });

            Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
            for (int i = 0; i < 10; i++) {
                String content = "client msg " + i;
                ByteBuf buf = ch.alloc().buffer();
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                ch.writeAndFlush(buf);

                Thread.sleep(random.nextInt(20000));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}
           

複制

上面的代碼是 Netty 的用戶端端的初始化代碼, 使用過 Netty 的朋友對這個代碼應該不會陌生. 别的部分我們就不再贅述, 我們來看看

ChannelInitializer.initChannel

部分即可:

.handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new IdleStateHandler(0, 0, 5));
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
        p.addLast(new ClientHandler());
    }
});
           

複制

我們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 我們為用戶端端設定了讀寫 idle 逾時, 時間間隔是5s, 即如果用戶端在間隔 5s 後都沒有收到伺服器的消息或向伺服器發送消息, 則産生 ALL_IDLE 事件.

接下來我們添加了 LengthFieldBasedFrameDecoder, 它是負責解析我們的 TCP 封包, 因為和本文的目的無關, 是以這裡不詳細展開.

最後一個 Handler 是 ClientHandler, 它繼承于 CustomHeartbeatHandler, 是我們處理業務邏輯部分.

用戶端 Handler

public class ClientHandler extends CustomHeartbeatHandler {
    public ClientHandler() {
        super("client");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
}
           

複制

ClientHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實作 僅僅列印收到的消息.

第二個重寫的方法是 handleAllIdle. 我們在前面提到, 用戶端負責發送心跳的 PING 消息, 當用戶端産生一個 ALL_IDLE 事件後, 會導緻父類的 CustomHeartbeatHandler.userEventTriggered 調用, 而 userEventTriggered 中會根據 e.state() 來調用不同的方法, 是以最後調用的是 ClientHandler.handleAllIdle, 在這個方法中, 用戶端調用 sendPingMsg 向伺服器發送一個 PING 消息.

伺服器部分

伺服器初始化

public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(10, 0, 0));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ServerHandler());
                        }
                    });

            Channel ch = bootstrap.bind(12345).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
           

複制

伺服器的初始化部分也沒有什麼好說的, 它也和用戶端的初始化一樣, 為 pipeline 添加了三個 Handler.

伺服器 Handler

public class ServerHandler extends CustomHeartbeatHandler {
    public ServerHandler() {
        super("server");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
        byte[] data = new byte[buf.readableBytes() - 5];
        ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
        buf.skipBytes(5);
        buf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
        channelHandlerContext.write(responseBuf);
    }

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        super.handleReaderIdle(ctx);
        System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
        ctx.close();
    }
}
           

複制

ServerHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實作 EchoServer 的功能: 即收到用戶端的消息後, 立即原封不動地将消息回複給用戶端.

第二個重寫的方法是 handleReaderIdle, 因為伺服器僅僅對用戶端的讀 idle 感興趣, 是以隻重新了這個方法. 若伺服器在指定時間後沒有收到用戶端的消息, 則會觸發 READER_IDLE 消息, 進而會調用 handleReaderIdle 這個方法.

我們在前面提到, 用戶端負責發送心跳的 PING 消息, 并且伺服器的 READER_IDLE 的逾時時間是用戶端發送 PING 消息的間隔的兩倍, 是以當伺服器 READER_IDLE 觸發時, 就可以确定是用戶端已經掉線了, 是以伺服器直接關閉用戶端連接配接即可.

總結

  1. 使用 Netty 實作心跳機制的關鍵就是利用 IdleStateHandler 來産生對應的 idle 事件.
  2. 一般是用戶端負責發送心跳的 PING 消息, 是以用戶端注意關注 ALL_IDLE 事件, 在這個事件觸發後, 用戶端需要向伺服器發送 PING 消息, 告訴伺服器"我還存活着".
  3. 伺服器是接收用戶端的 PING 消息的, 是以伺服器關注的是 READER_IDLE 事件, 并且伺服器的 READER_IDLE 間隔需要比用戶端的 ALL_IDLE 事件間隔大(例如用戶端ALL_IDLE 是5s 沒有讀寫時觸發, 是以伺服器的 READER_IDLE 可以設定為10s)
  4. 當伺服器收到用戶端的 PING 消息時, 會發送一個 PONG 消息作為回複. 一個 PING-PONG 消息對就是一個心跳互動.

實作用戶端的斷線重連

public class Client {
    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.start();
        client.sendData();
    }

    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
                String content = "client msg " + i;
                ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                channel.writeAndFlush(buf);
            }

            Thread.sleep(random.nextInt(20000));
        }
    }

    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler(Client.this));
                        }
                    });
            doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);

        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("Connect to server successfully!");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");

                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });
    }

}
           

複制

上面的代碼中, 我們抽象出 doConnect 方法, 它負責用戶端和伺服器的 TCP 連接配接的建立, 并且當 TCP 連接配接失敗時, doConnect 會 通過

channel().eventLoop().schedule

來延時10s 後嘗試重新連接配接.

用戶端 Handler

public class ClientHandler extends CustomHeartbeatHandler {
    private Client client;
    public ClientHandler(Client client) {
        super("client");
        this.client = client;
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
}
           

複制

斷線重連的關鍵一點是檢測連接配接是否已經斷開. 是以我們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當 TCP 連接配接斷開時, 會回調 channelInactive 方法, 是以我們在這個方法中調用 client.doConnect() 來進行重連。

作者:永順

https://urlify.cn/FjyQZf