天天看點

從零開始實作簡單 RPC 架構 9:網絡通信之心跳與重連機制

一、心跳

什麼是心跳

在 TPC 中,用戶端和服務端建立連接配接之後,需要定期發送資料包,來通知對方自己還線上,以確定 TPC 連接配接的有效性。如果一個連接配接長時間沒有心跳,需要及時斷開,否則服務端會維護很多無用連接配接,浪費服務端的資源。

IdleStateHandler

Netty 已經為我們提供了心跳的 Handler:

IdleStateHandler

。當連接配接的空閑時間(讀或者寫)太長時,

IdleStateHandler

将會觸發一個

IdleStateEvent

事件,傳遞的下一個 Handler。我們可以通過在 Pipeline Handler 中重寫

userEventTrigged

方法來處理該事件,注意我們自己的 Handler 需要在

IdleStateHandler

後面。

下面我們來看看 IdleStateHandler 的源碼。

1. 構造函數

最完整的構造函數如下:

public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
}
           

參數解析:

  • observeOutput

    :是否考慮出站時較慢的情況。如果 true:當出站時間太長,超過空閑時間,那麼将不觸發此次事件。如果 false,超過空閑時間就會觸發事件。預設 false。
  • readerIdleTime

    :讀空閑的時間,0 表示禁用讀空閑事件。
  • writerIdleTime

    :寫空閑的時間,0 表示禁用寫空閑事件。
  • allIdleTime

    :讀或寫空閑的時間,0 表示禁用事件。
  • unit

    :前面三個時間的機關。

2. 事件處理

IdleStateHandler

繼承

ChannelDuplexHandler

,重寫了出站和入站的事件,我們來看看代碼。

當 channel 添加、注冊、活躍的時候,會初始化

initialize(ctx)

,删除、不活躍的時候銷毀

destroy()

,讀寫的時候設定

lastReadTime

lastWriteTime

字段。

public class IdleStateHandler extends ChannelDuplexHandler {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            initialize(ctx);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        destroy();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        initialize(ctx);
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        destroy();
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 判斷是否開啟 讀空閑 或者 讀寫空閑 監控
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            // 設定 reading 标志位
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    // 讀完成之後
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 判斷是否開啟 讀空閑 或者 讀寫空閑 監控,檢查 reading 标志位
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            // 設定 lastReadTime,後面判斷讀逾時有用
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 判斷是否開啟 寫空閑 或者 讀寫空閑 監控
        if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            // writeListener 的方法在下面,主要是設定 lastWriteTime
            ctx.write(msg, promise.unvoid()).addListener(writeListener);
        } else {
            ctx.write(msg, promise);
        }
    }
    
    private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };
}
           

3. 初始化

當 channel 添加、注冊、活躍的時候,會初始化

initialize(ctx)

,下面我們就來看看初始化的代碼:

private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    initOutputChanged(ctx);
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}
           

其實初始化很簡單,就是根據構造函數給的 讀寫空閑時間 去決定初始化哪些定時任務,分别是:

ReaderIdleTimeoutTask

(讀空閑逾時任務)、

WriterIdleTimeoutTask

(寫空閑逾時任務)、

AllIdleTimeoutTask

(讀寫空閑逾時任務)。

4. 定時任務

我們來看看

ReaderIdleTimeoutTask

,剩下兩個的原理跟

ReaderIdleTimeoutTask

差不多,感興趣的同學自行閱讀源碼吧。

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }
    @Override
    protected void run(ChannelHandlerContext ctx) {
        // 檢視是否逾時
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;
        }
        if (nextDelay <= 0) {
            // 逾時了,重新啟動一個新的定時器,然後觸發事件
            // Reader is idle - set a new timeout and notify the callback.
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;
            try {
                // 構造事件
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                // 觸發事件
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // 沒有逾時,設定新的定時器,不過這次的時間是更短的時間
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}
           

從上面的代碼可以看出:

① 如果讀空閑逾時了,則重新起一個定時器,然後觸發事件

② 如果讀空閑未逾時,則新起一個時間更短(

readerIdleTimeNanos - ticksInNanos() - lastReadTime

)的定時器

5. 觸發事件

上面的觸發事件方法是:

channelIdle

,經過重重代碼撥開,其實最終就是調用到了下面的代碼:

private void invokeUserEventTriggered(Object event) {
    if (invokeHandler()) {
        try {
            // 觸發事件,說白了,就是直接調用 userEventTriggered 方法而已
            ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireUserEventTriggered(event);
    }
}
           

其實觸發事件,就是把事件傳給下一個 Handler (

next

),就是調用

userEventTriggered

方法而已。是以我們處理心跳的 Handler 一定要寫到

IdleStateHandler

ccx-rpc 心跳實作

1. 用戶端

IdleStateHandler

放到啟動類的

PipleLine

注冊上,業務處理器

NettyClientHandler

在其後面。

public class NettyClient {
    // ... 忽略其他代碼
    private NettyClient() {
        bootstrap = new Bootstrap()
                // ... 省略其他代碼
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline();
                        // 設定 IdleStateHandler 心跳檢測每 5 秒進行一次寫檢測
                        // write()方法超過 5 秒沒調用,就調用 userEventTrigger
                        p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
                        // 編碼器
                        p.addLast(new RpcMessageEncoder());
                        // 解碼器
                        p.addLast(new RpcMessageDecoder());
                        // 業務處理器
                        p.addLast(new NettyClientHandler());
                    }
                });
    }
}
           

接下來我們來看看

NettyClientHandler

是如何處理心跳事件的:

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
    // ... 忽略其他代碼
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 根據上面的配置,超過 5 秒沒有寫請求,會觸發 WRITER_IDLE 事件
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                log.info("write idle happen [{}]", ctx.channel().remoteAddress());
                Channel channel = ctx.channel();
                // 觸發寫空閑事件後,就應該發心跳了。
                // 組裝消息
                RpcMessage rpcMessage = new RpcMessage();
                rpcMessage.setSerializeType(SerializeType.PROTOSTUFF.getValue());
                rpcMessage.setCompressTye(CompressType.DUMMY.getValue());
                rpcMessage.setMessageType(MessageType.HEARTBEAT.getValue());
                // 發心跳消息
                channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
           

2. 服務端

同樣,服務端的

IdleStateHandler

放到啟動類的

PipleLine

注冊上,業務處理器

NettyServerHandler

在其後面。

public class NettyServerBootstrap {
    public void start() {
        ServerBootstrap bootstrap = new ServerBootstrap()
                // ... 忽略其他代碼
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline();
                        // 30 秒之内沒有收到用戶端請求的話就關閉連接配接
                        p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
                        // 編解碼器
                        p.addLast(new RpcMessageEncoder());
                        p.addLast(new RpcMessageDecoder());
                        // RPC 消息處理器
                        p.addLast(serviceHandlerGroup, new NettyServerHandler());
                    }
                });
        // ... 忽略其他代碼
    }
}
           

服務端收到超過 30 秒沒有讀請求的事件後,調用

ctx.close

将連接配接關閉。

同時,如果收到了用戶端發來的心跳消息,直接忽略即可。如果每個心跳都要去響應,會加重伺服器的負擔的。

NettyServerHandler

的代碼如下

public class NettyServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) {
        // 不處理心跳消息
        if (requestMsg.getMessageType() != MessageType.REQUEST.getValue()) {
            return;
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 處理空閑狀态的
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                log.info("idle check happen, so close the connection");
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
           

二、重連機制

很多時候服務端和用戶端連接配接斷開,僅僅是因為網絡問題或者處理程式慢,并不是程式挂了。那麼用戶端想再發起請求,就發不出去了。此時需要一個功能:當發現連接配接斷了之後,如果想往連接配接寫資料,就自動重新連接配接上,這個就是重連機制。

用戶端想請求服務端的接口,先從注冊中心中,獲得服務端的位址,然後跟服務端連接配接,然後寫資料。

簡單代碼如下:

protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
    // ... 忽略其他代碼
    // 服務端位址
    InetSocketAddress socketAddress = new InetSocketAddress(selected.getHost(), selected.getPort());
    // 擷取連接配接(Channel)
    Channel channel = nettyClient.getChannel(socketAddress);
    // 建構消息
    RpcMessage rpcMessage = buildRpcMessage(request);
    // 寫消息(發請求)
    channel.writeAndFlush(rpcMessage);
}
           

這個

nettyClient.getChannel(socketAddress)

是重連機制的秘密:

/**
 * 擷取和指定位址連接配接的 channel,如果擷取不到,則連接配接
 *
 * @param address 指定要連接配接的位址
 * @return channel
 */
public Channel getChannel(SocketAddress address) {
    // 根據位址從緩存中擷取 Channel
    Channel channel = CHANNEL_MAP.get(address);
    // 如果擷取不到,或者 channel 已經斷開,則重連,然後放到 CHANNEL_MAP 緩存起來
    if (channel == null || !channel.isActive()) {
        // 連接配接
        channel = connect(address);
        CHANNEL_MAP.put(address, channel);
    }
    return channel;
}
           

代碼一目了然,就是使用了

CHANNEL_MAP

作為緩存,發現找不到或者已斷開,就重新連接配接,然後放到

CHANNEL_MAP

中,以便下次擷取。

總結

心跳是用于服務端和用戶端保持有效連接配接的一種手段,用戶端每隔一小段時間發一個心跳包,服務端收到之後不用響應,但是會記下用戶端最後一次讀的時間。伺服器起定時器,定時檢測用戶端上次讀請求的時間超過配置的值,超過就會觸發事件,斷開連接配接。

重連機制是連接配接斷開之後,要使用的時候自動重連的機制。

心跳和重連機制,結合起來讓服務端和用戶端的連接配接使用更加合理,該斷開的斷開節省服務端資源,該重連的重連提高可用性。

ccx-rpc 代碼已經開源

Github:https://github.com/chenchuxin/ccx-rpc

Gitee:https://gitee.com/imccx/ccx-rpc