什麼是編解碼器?
首先,我們回顧一下netty的元件設計:Netty的主要元件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等。
ChannelHandler
ChannelHandler充當了處理入站和出站資料的應用程式邏輯的容器。例如,實作ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和資料,這些資料随後會被你的應用程式的業務邏輯處理。當你要給連接配接的用戶端發送響應時,也可以從ChannelInboundHandler沖刷資料。你的業務邏輯通常寫在一個或者多個ChannelInboundHandler中。ChannelOutboundHandler原理一樣,隻不過它是用來處理出站資料的。
ChannelPipeline
ChannelPipeline提供了ChannelHandler鍊的容器。以用戶端應用程式為例,如果事件的運動方向是從用戶端到服務端的,那麼我們稱這些事件為出站的,即用戶端發送給服務端的資料會通過pipeline中的一系列ChannelOutboundHandler,并被這些Handler處理,反之則稱為入站的。
編碼解碼器
當你通過Netty發送或者接受一個消息的時候,就将會發生一次資料轉換。入站消息會被解碼:從位元組轉換為另一種格式(比如java對象);如果是出站消息,它會被編碼成位元組。
Netty提供了一系列實用的編碼解碼器,他們都實作了ChannelInboundHadnler或者ChannelOutcoundHandler接口。在這些類中,channelRead方法已經被重寫了。以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調用。随後,它将調用由已知解碼器所提供的decode()方法進行解碼,并将已經解碼的位元組轉發給ChannelPipeline中的下一個ChannelInboundHandler。
解碼器
抽象基類ByteToMessageDecoder
由于你不可能知道遠端節點是否會一次性發送一個完整的資訊,tcp有可能出現粘包拆包的問題,這個類會對入站資料進行緩沖,直到它準備好被處理。
主要api有兩個:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in, out);
}
}
}
decode方法:
必須實作的方法,ByteBuf包含了傳入資料,List用來添加解碼後的消息。對這個方法的調用将會重複進行,直到确定沒有新的元素被添加到該List,或者該ByteBuf中沒有更多可讀取的位元組時為止。然後如果該List不會空,那麼它的内容将會被傳遞給ChannelPipeline中的下一個ChannelInboundHandler。
decodeLast方法:
當Channel的狀态變成非活動時,這個方法将會被調用一次。
最簡單的例子:
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
這個例子,每次入站從ByteBuf中讀取4位元組,将其解碼為一個int,然後将它添加到下一個List中。當沒有更多元素可以被添加到該List中時,它的内容将會被發送給下一個ChannelInboundHandler。int在被添加到List中時,會被自動裝箱為Integer。在調用readInt()方法前必須驗證所輸入的ByteBuf是否具有足夠的資料。
一個實用的例子:
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
//在讀取前标記readerIndex
in.markReaderIndex();
//讀取頭部
int length = in.readInt();
if (in.readableBytes() < length) {
//消息不完整,無法處理,将readerIndex複位
in.resetReaderIndex();
return;
}
out.add(in.readBytes(length).toString(CharsetUtil.UTF_8));
}
}
抽象類ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder擴充了ByteToMessageDecoder類,使用這個類,我們不必調用readableBytes()方法。參數S指定了使用者狀态管理的類型,其中Void代表不需要狀态管理。
以上代碼可以簡化為:
public class MySimpleDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//傳入的ByteBuf是ReplayingDecoderByteBuf
//首先從入站ByteBuf中讀取頭部,得到消息體長度length,然後讀取length個位元組,
//并添加到解碼消息的List中
out.add(in.readBytes(in.readInt()).toString(CharsetUtil.UTF_8));
}
如何實作的?
ReplayingDecoder在調用decode方法時,傳入的是一個自定義的ByteBuf實作:
final class ReplayingDecoderByteBuf extends ByteBuf
ReplayingDecoderByteBuf在讀取資料前,會先檢查是否有足夠的位元組可用,以readInt()為例:
final class ReplayingDecoderByteBuf extends ByteBuf {
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
......
@Override
public int readInt() {
checkReadableBytes(4);
return buffer.readInt();
}
private void checkReadableBytes(int readableBytes) {
if (buffer.readableBytes() < readableBytes) {
throw REPLAY;
}
}
......
}
如果位元組數量不夠,會抛出一個Error(實際是一個Signal public final class Signal extends Error implements Constant<Signal> ),然後會在上層被捕獲并處理,它會把ByteBuf中的ReadIndex恢複到讀之前的位置,以供下次讀取。當有更多資料可供讀取時,該decode()方法将會被再次調用。最終結果和之前一樣,從ByteBuf中提取的String将會被添加到List中。
雖然ReplayingDecoder使用友善,但它也有一些局限性:
1. 并不是所有的 ByteBuf 操作都被支援,如果調用了一個不被支援的方法,将會抛出一個 UnsupportedOperationException。
2. ReplayingDecoder 在某些情況下可能稍慢于 ByteToMessageDecoder,例如網絡緩慢并且消息格式複雜時,消息被拆成了多個碎片,于是decode()方法會被多次調用反複地解析一個消息。
3. 你需要時刻注意decode()方法在同一個消息上可能被多次調用.。
錯誤用法:
一個簡單的echo服務,用戶端在連接配接建立時,向服務端發送消息(兩個1)。服務端需要一次拿到兩個Integer,并做處理。
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg from client: " + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("sent to server: 11");
ctx.writeAndFlush(1);
Thread.sleep(1000);
ctx.writeAndFlush(1);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
解碼器
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
values.add(in.readInt());
values.add(in.readInt());
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
運作程式,就會發現斷言失敗。
我們通過在decode()方法中列印日志或者打斷點的方式,可以看到,decode()方法是被調用了兩次的,分别在服務端兩次接受到消息的時候:
第一次調用時,由于緩沖區中隻有四個位元組,在第二句 values.add(in.readInt()) 中抛出了異常REPLAY,在ReplayingDecoder中被捕獲,并複位ReadIndex。此時values.size() = 1。
第二次調用時,從頭開始讀取到兩個Integer并放入values,是以values.size() = 3。
正确用法:
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//清空隊列
values.clear();
values.add(in.readInt());
values.add(in.readInt());
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
如何提高ReplayingDecoder的性能?如上所說,使用ReplayingDecoder存在對一個消息多次重複解碼的問題,我們可以通過Netty提供的狀态控制來解決這個問題。
首先我們将消息結構設計為:header(4個位元組,存放消息體長度),body(消息體)
根據消息的結構,我們定義兩個狀态:
public enum MyDecoderState {
/**
* 未讀頭部
*/
READ_LENGTH,
/**
* 未讀内容
*/
READ_CONTENT;
}
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("sent to server: msg" + i);
ctx.writeAndFlush("msg" + i);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg from client: " +
((ByteBuf) msg).toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
解碼器
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = in.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = in.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn\'t reach here.");
}
}
}
編碼器
public class MyEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
byte[] b = msg.getBytes();
int length = b.length;
//write length of msg
out.writeInt(length);
//write msg
out.writeBytes(b);
}
}
當頭部被成功讀取到時,我們調用 checkpoint(MyDecoderState.READ_CONTENT) 設定狀态為“未讀消息”,相當于設定一個标志位,如果在後續讀取時抛出異常,那麼readIndex會被複位到上一次你調用checkpoint()方法的地方。下一次接收到消息,再次調用decode()方法時,就能夠從checkpoint處開始讀取,避免了又從頭開始讀。
更多解碼器:
LineBasedFrameDecoder
這個類在Netty内部也有使用,它使用行尾控制字元(\n或者\r\n)作為分隔符來解析資料。
DelimiterBasedFrameDecoder
使用自定義的特殊字元作為消息的分隔符。
HttpObjectDecoder
一個HTTP資料的解碼器。
這些解碼器也非常實用,下次更新關于這些解碼器的原理和詳細使用。
更多詳細内容參見《netty in action》 或者netty源碼的英文注釋。