一、先來官方入門頁面的翻譯(翻譯不好請多包涵)
入門
本章以簡單的例子來介紹Netty的核心概念,以便讓您快速入門。當您閱讀完本章之後,您就能立即在Netty的基礎上寫一個用戶端和一個伺服器。
如果您喜歡自上而下的方法來學習某些東西,那麼就可能需要從第二章“架構概述”開始,然後回到這一章。
入門之前
運作本章的例子,最低要求有兩個:最新版的Netty和JDK1.6+。其餘的balabala...
寫一個Discard Server(丢棄伺服器)
世界上最簡單的協定不是“Hello World!”,而是DISCARD(丢棄)。它是一種 丢棄任何收到的資料并且沒有任何響應的協定。(說白了就是用戶端來消息,伺服器不鳥你)
要實作這種協定,你需要做的隻有 忽略掉所有收到的資料 。讓我們直接從Handler的實作開始,它處理由Netty生成的I/O事件。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
1. DiscardServerHandler 繼承了 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter實作了ChannelInboundHandler接口。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
ChannelInboundHandler提供了很多你可以覆寫的事件處理方法。現在,你隻需要繼承ChannelInboundHandlerAdapter而不是實作ChannelInboundHandler接口。
2. 我們在這裡覆寫了channelRead方法,每當從用戶端傳來新消息時,都會觸發這個方法。在這裡,負責接收消息的資料類型是ByteBuf。
3. 為了實作DISCARD協定,我們需要忽略收到的消息。ByteBuf是一個引用計數對象,需要明确的通過調用release()方法去釋放。請記住,使用者有責任釋放傳遞給處理程式的引用計數對象。通常,
channelRead()
處理方法的實作方式如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
4. 當Netty發生IO異常時,會調用exceptionCaught()。大多數情況下,發生的異常應該被日志記錄,盡管這裡的處理方式可能跟您的實際情況有所不同,但是相應的通道應該在這裡關閉。比如:在關閉連接配接之前,您想響應一個錯誤代碼
到目前為止我們完成了伺服器的一半,接下來利用DiscardServerHandler來編寫啟動伺服器的main方法。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
1. NioEventLoopGroup 是一個處理IO操作的多線程時間循環。Netty為了不同的傳輸協定提供了很多EventLoopGroup接口的實作。我們在這個例子裡實作了一個服務端程式,是以我們用了兩個NioEventLoopGroup。第一個,經常被稱作“boss”,用來接收将要到來的連接配接。第二個,經常被稱作“worker”,一旦Boss接收到連接配接并把連接配接注冊到Worker,就開始處理接收到的連接配接的通信。使用多少線程以及如何将它們映射到建立的通道中取決于EventLoopGroup的具體實作,可以通過構造函數來配置。
2. ServerBootstrap作為一個輔助類去設定一個伺服器。你可以直接用一個Channel來設定伺服器。然而,請注意那是一個繁瑣的過程,大多數情況下不需要你那樣做。
3. 在這裡,我們特别使用NioServerSocketChannel來執行個體化一個新通道去接受将要到來的連接配接。
4. 這裡的處理程式始終被新接收的Channel評估。(這句話不知到咋解釋了)。ChannelInitializer是一個特殊的處理程式,目的是幫助使用者配置一個新Channel。您可能想要通過添加一個處理程式比如DiscardServerHandler來配置一個新管道的ChannelPipeline來實作你的網絡程式。随着應用程式的複雜化,您可能會在管道中添加更多的處理程式,并将這個匿名類最終提取到頂級類中。
5. 您還可以在Channel的具體實作設定一些特殊的參數。我們現在在寫一個TCP/IP的伺服器,是以允許我們設定一些套接字的選項,例如:tcpNoDelay,keepAlive。請參考API文檔中ChannelOption,特别是ChannelConfig的具體實作類,以便去獲得關于ChannelOptions相關支援的概述。
6. 你注意到option()和childOption()了嗎?option()是為了NioServerSocketChannel接受新的連接配接,childOption()是為了那些被父ServerChannel接受的Channels,在這裡ServerChannel指的是NioServerSocketChannel。
7. 我們現在已經準備好了。最後剩下綁定端口和啟動伺服器。在這裡我們綁定8080端口。你可以根據需要調用bind()方法多次。
現在你已經完成了第一台Netty伺服器。
檢視接收到的資料
現在我們已經寫完了我們第一台伺服器,現在需要驗證它是否正常工作。這裡有一個簡單的方法,通過使用telnet指令。例如,你可以在指令行輸入
telnet localhost 8080
但是,我們能看到伺服器是否正常工作嗎?我們不能知道,因為它是一台丢棄伺服器。我們不會得到任何回應。為了證明他已經工作了,我們需要修改伺服器,讓伺服器列印接收到的消息。
我們已經知道當有資料的時候就調用channelRead()方法,是以我們寫一些代碼在DiscardServerHandler中的channelRead方法裡。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
1. 實際上這個低效循環可以簡化為:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
2. 或者,您可以在in.release()這裡做。
如果你再次執行telnet 指令,你會看見伺服器輸出了接受到的資料。
寫一個響應伺服器
目前為止,我們隻接受但是沒有任何響應。一台伺服器,通常應該響應該請求。讓我們學習如何通過實作ECHO協定向用戶端寫入響應消息,其中任何接收到的資料都被發送回來。
與前面部分實作的丢棄伺服器的唯一差別在于它将接收到的資料發回,而不是将接收的資料輸出到控制台。是以,再次修改
channelRead()
方法是足夠的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
1. ChannelHandlerConetxt提供了很多方法讓你去觸發IO時間或操作。這裡我們調用write(object)來逐字的寫入接受到的消息。注意,我們不像DISCARD例子裡的那樣,我們沒有釋放我們收到的消息。這是因為當它被寫回到wire時,Netty替我們釋放它。
2. ctx.write(Object)不會讓消息發送,它存在于内部緩沖區,通過調用ctx.flush()來把消息發送出去,或者,您可以簡潔的調用ctx.writeAndFlush(msg)。
如果您再次使用telnet指令,您就會發現你發送什麼伺服器傳回什麼。
寫一個時間伺服器
本節要實作的協定是Time協定。與前面的例子不同的是,它發送一個包含32位整數的消息,而不接收任何請求,并在發送消息後關閉連接配接。在此示例中,您将學習如何建構和發送消息,并在完成時關閉連接配接。
因為我們要忽略任何接受到的資料并且當建立好連接配接就發送一條消息,這次就不能再使用channelRead()了,相反,我們使用channelActive()方法。下面是實作:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1. 正如解釋的那樣,當建立好連接配接并且準備好産生流量的時候會調用
channelActive()
方法,讓我們在這個方法中寫入一個32位整數來代表目前時間
2. 為了發送一個新消息,我們需要開辟出一塊緩沖區來存放我們的消息。我們将要寫一個32位的整數,是以我們需要一個容量至少為4位元組的ByteBuf。通過ChannelHandlerContext.alloc()獲得目前的
ByteBufAllocator,并配置設定出一塊緩沖區。
3. 像往常一樣,我們寫入包裝好的消息。
等等,flip在哪?在NIO裡發消息之前我們不調用一下java.nio.ByteBuffer.flip()嗎?ByteBuf沒有那個方法因為他有兩個指針,一個為了讀操作,一個為了寫操作。當你寫點什麼的時候,寫指針會向前移動而讀指針不會移動。讀指針和寫指針分别代表消息的開始和結束位置。
相比之下,NIO緩沖區沒有提供一種清晰地方式指出消息的開始和結束,除非調用flip方法。當你忘記flip的時候你會陷入麻煩,因為将要發送的内容為空或者是錯誤的資料。這樣的錯誤不會發生在Netty,因為不同的操作類型我們有不同的指針。當你使用它的時候你會發現他讓你的生活更容易,因為不用再考慮flip了。
另外一點需要注意的是:ChannelHandlerContext.write()(還有writeAndFlush())方法傳回一個ChannelFuture對象,一個ChannelFuture對象代表還沒有發生的IO操作,這意味着任何請求的操作可能尚未執行,因為所有操作在Netty中都是異步的。比如,在消息發送之前,下面的代碼也會關閉連接配接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
是以,當
ChannelFuture執行完成後你需要調用
close()方法,
ChannelFuture是由
write()方法傳回的,并且他會告訴所有的監聽者當write操作完成時。請注意,
close()
也可能不會立即關閉連接配接,并且他傳回一個ChannelFuture。
4. 當請求完成的時候我們怎樣得到通知呢?就像給傳回的ChannelFuture添加ChannelFutureListener一樣,在這裡我們建立一個匿名内部類ChannelFutureListener,當操作完成時,我們關閉Channel
或者您可以的使用系統預定義的監聽器來簡化代碼
f.addListener(ChannelFutureListener.CLOSE);
為了驗證Time伺服器是否成功,可以通過Unix指令rdate來檢查:
$ rdate -o <port> -p <host>
port是main方法裡面明确的,host一般是localhost。
寫一個Time用戶端
不像DISCARD和ECHO伺服器那樣,為了Time協定我們需要一個用戶端,因為人類不能講一個32位整數翻譯成月曆上表示的日期。在本節中,我們将讨論如何确定伺服器正常運作以及如何寫一個基于Netty的用戶端。
基于Netty的伺服器和用戶端之間最大且唯一的不同就是:使用的Bootstrap和Channel得實作。請看:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1. Bootstrap和ServerBootstrap非常相似,除了它是一個非伺服器通道,比如:用戶端或者無連接配接通道。
2. 如果你隻指定一個EventLoopGroup,他将同時用于boss組和worker組。盡管用戶端不使用boos 的 worker
3.不同于NioServerSocketChannel,NioSocketChannel被用來建立用戶端Channel。
4.請注意在這裡我們沒有用childOption(),不像我們在ServerBootstrap那樣,因為在用戶端的SocketChannel沒有Parent。
5. 我們需要調用
connect()方法,而不是bind()方法。
正如你看見的,和伺服器的代碼沒什麼差別。怎樣實作一個ChannelHandler?他應該接受一個來自伺服器的32位的整數,并将它解釋為人類可以看得懂的形式,列印出翻譯過的時間,并關閉連接配接。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.在TCP / IP中,Netty将從對等體發送的資料讀入
ByteBuf
。
它看起來很簡單,并且和伺服器代碼沒啥兩樣。然而,這個處理程式有時會拒絕服務并抛出異常IndexOutOfBoundsException。下一節我們将讨論這個問題。
處理基于流的運輸
套接字緩沖區的一個小小注意事項
在諸如TCP / IP的基于流的傳輸中,接收的資料被存儲到套接字接收緩沖器中。不幸的是,基于流的傳輸的緩沖區不是資料包的隊列,而是位元組隊列。這意味着,即使您發送兩個消息作為兩個獨立資料包,作業系統也不會将它們視為兩個消息,而隻是一堆位元組。是以,您無法保證您所讀取的内容正是您遠端對等人寫的内容。例如,假設作業系統的TCP / IP堆棧已經收到三個資料包:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SOyQDMwETOxcTMtUDMwYTM2ETOxAzM3AzNxAjMtgzN0MTM48CX3AzNxAjMvwFO3QzMxgzLcd2bsJ2Lc12bj5ycn9Gbi52YucTMwIzcldWYtl2Lc9CX6MHc0RHaiojIsJye.png)
由于基于流的協定的這種一般屬性,在應用程式中以下列分片形式讀取它們的可能性很大:
是以,無論伺服器端或用戶端如何,接收部分都應将接收的資料進行碎片整理,整理成一個一個有意義的結構以便于很容易的被程式邏輯所了解。在上述示例的情況下,接收到的資料應該如下所示:
第一個解決方案
現在讓我們回到之前的Time用戶端例子。
我們在這裡也有同樣的問題。一個32位整數是非常少量的資料,它不太可能經常碎片化。然而,問題是可以分散,碎片化的可能性會随着流量的增加而增加。
簡單的解決方案是建立内部累積緩沖區,并等待所有4個位元組都被接收到内部緩沖區。以下是
TimeClientHandler
修複的問題修複實作:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1. 一個
ChannelHandler
有兩個生命周期偵聽器方法:
handlerAdded()
和
handlerRemoved()
。隻要不長時間阻塞,你就可以任意執行初始化任務。
2. 首先,所有接受到的資料都會累積到緩沖區buf。
3. 然後,處理程式會檢查buf是否含有足夠的資料,在這個例子裡是4個位元組,然後繼續執行現有的業務邏輯。否則,當更多的資料到來時Netty将再次調用channelRead()方法,最終将有4個位元組的資料将被積累。
第二個解決方案
盡管第一個解決方案解決了問題,但是修改後的處理程式看起來并不是很幹淨。想象一下有一個由多個字段組成的更複雜的協定,例如可變長度字段,你的
ChannelInboundHandler 實作馬上變的力不從心。
你可能已經注意到,您可以添加多個ChannelHandler到
ChannelPipeline上。是以,您可以将一個龐大的ChannelHandler分割成許多小子產品,以減少應用程式的複雜性。例如,您可以分割 TimeClientHandler 成兩個處理程式:
TimeDecoder 解決碎片問題,和
TimeClientHandler的簡單初始程式
幸運的是,Netty提供了一個可擴充的類,可幫助您編寫開箱即用的第一個:
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
1.
ByteToMessageDecoder
是
ChannelInboundHandler 接口的實作,讓處理碎片問題更容易。
2. 無論什麼時候有新資料到來時,
ByteToMessageDecoder
利用内部維護的一個緩沖區調用decode()方法。
3. 當累積的緩沖區沒有足夠的資料時,decode()能夠決定不向out添加資料。當有更多的資料到來時,
ByteToMessageDecoder
能夠再次調用decode()方法。
4. 如果decode()添加了一個對象到out,那就意味着解碼器成功的解碼了一條消息。
ByteToMessageDecoder
将丢棄累積緩沖區的讀部分,請記住你不需要解碼更多的消息。
ByteToMessageDecoder
會保持調用decode()直到不再向out添加資料。
現在我們有另外一個處理程式要添加到
ChannelPipeline,我們需要修改TimeClient中
ChannelInitializer 的實作部分:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一個大膽的人,你可能想要嘗試更簡化的解碼器
ReplayingDecoder
。你需要查閱API來擷取更多的資訊。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty提供了開箱即用的解碼器,使您能夠輕松實作大多數協定,并幫助您避免實作一個龐大的不可維護的處理程式。有關更多詳細示例,請參閱以下軟體包:
io.netty.example.factorial 用于二進制協定
io.netty.example.telnet 用于文本行協定。
使用POJO而不是ByteBuf
到目前為止我們見過的所有例子都是使用ByteBuf作為協定消息的資料結構。在本節中,我們将改進
TIME
協定用戶端和伺服器示例,使用POJO而不是
ByteBuf
在你的ChannelHandlers中使用POJO的優勢是明顯的,你的處理程式變得可維護,通過分離出提取ByteBuf中的資訊的代碼使得處理程式變得可重用。在
TIME
用戶端和伺服器示例中,我們隻讀取一個32位整數,它并不是我們直接使用ByteBuf面臨的主要問題。然而,你會發現在真實世界協定中把它分離是很有必要的。
首先我們定義一個新類:
UnixTime。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
現在我們修改TimeDecoder,生成一個 UnixTime而不是ByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
根據更新的解碼器,
TimeClientHandler
不再需要一個ByteBuf
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更簡單優雅,對吧?同樣的技術可以在伺服器端應用。這次讓我們先更新
TimeServerHandler
。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
現在,唯一缺少的部分就是編碼器。它是
ChannelOutboundHandler接口的實作,他将一個UnitTime轉換成ByteBuf,它比編寫解碼器簡單得多,因為在編碼消息的時候不需要考慮碎片問題。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
1. 在這一行有很多重要的問題
第一,我們傳輸原始
ChannelPromise
的時候按照原樣傳輸,這樣當編碼的資料被實際寫入電線時,Netty将其标記為成功或失敗。
第二,我們沒有調用ctx.flush(),這裡有一個單獨的處理程式方法
void flush(ChannelHandlerContext ctx)
,目的是覆寫
flush()方法
為了使程式更簡化,你可以使用
MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
剩下最後一件事情,就是在伺服器端将寫好的編碼器 插入到
ChannelPipeline 中的 TimeServerHandler 之前。把它留作一個簡單的練習。
關閉您的應用程式
關閉Netty應用程式就像通過調用shutdownGracefully()關閉你建立的所有
EventLoopGroup
s一樣簡單。他傳回一個
Future當
EventLoopGroup 完全關閉的時候,并且所有屬于這個組的管道都将關閉。
終于翻譯完啦
二、自己寫的一個小例子
伺服器端:
package learn.netty.hello;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to
// gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new DiscardServer(port).run();
}
}
處理程式:
package learn.netty.hello;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
System.out.print("Client Said:");
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
String back = "I received!";
ByteBuf out = ctx.alloc().buffer(back.length());
out.writeBytes(back.getBytes());
ctx.writeAndFlush(out);
ctx.close();
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
用戶端:
package learn.netty.hello;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class DiscardClient {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package learn.netty.hello;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class DiscardClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
System.out.print("Server Said:");
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
ctx.close();
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg = "Are you received?";
ByteBuf encoded = ctx.alloc().buffer(msg.length());
encoded.writeBytes(msg.getBytes());
ctx.write(encoded);
ctx.flush();
}
}
運作即可(詳細注釋在本文最後給出)
另外一個傳遞對象并解決随碎片問題例子
伺服器:
package learn.netty.pojo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())),
new ServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to
// gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
new Server(port).run();
}
}
package learn.netty.pojo;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import learn.netty.entity.User;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user = (User) msg;
System.out.println(user.toString());
ChannelFuture f = ctx.writeAndFlush(new User("Server", "Thank you for your coming!"));
f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package learn.netty.pojo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class Client {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8888;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//用戶端Bootstrap讓Channel使用起來更容易
Bootstrap b = new Bootstrap();
//設定事件組
b.group(workerGroup);
//設定Channel
b.channel(NioSocketChannel.class);
//設定選項
b.option(ChannelOption.SO_KEEPALIVE, true);
//設定Channel初始化的處理程式
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())),
new ClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
package learn.netty.pojo;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import learn.netty.entity.User;
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user = (User) msg;
System.out.println(user.toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture f = ctx.writeAndFlush(new User("Client", "I'm coming!"));
//f.addListener(ChannelFutureListener.CLOSE); //調用這個會關閉與伺服器之間的通道,導緻伺服器傳回的消息接收不到
// ChannelFutureListener CLOSE = new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) {
// future.channel().close();
// }
// };
//Channel channel()
//Returns a channel where the I/O operation associated with this future takes place.
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
運作即可。
下面是詳細解釋每一句話:
Client:
EventLoopGroup workerGroup = new NioEventLoopGroup();
=====================================================
EventLoopGroup是一個接口,NioEventLoopGroup繼承了實作EventLoopGroup接口的抽象類MultithreadEventLoopGroup。
實際上NioEventLoopGroup就是一個線程池,他的角色類似于一個大boss,每當有Channel的任務時,就會派人去處理。
Bootstrap b = new Bootstrap();
=====================================================
Bootstrap 繼承 AbstractBootstrap。
它幫助用戶端去更友善的使用Channel,針對伺服器有ServerBootstrap。在AbstractBootstrap裡面我們可以發現它需要使用者去管理幾個部分:
public B group(EventLoopGroup group)
public B channel(Class<? extends C> channelClass)
public B channelFactory(ChannelFactory<? extends C> channelFactory)
public B localAddress(SocketAddress localAddress)
public <T> B option(ChannelOption<T> option, T value)
public <T> B attr(AttributeKey<T> key, T value)
public B handler(ChannelHandler handler)
一般需要我們來設定的是,group,channel,options和handler。
是以仔細想想,Bootstrap類似于建造者模式,給它需要的東西,不需要知道内部實作細節,他就能自動處理,但是他沒有傳回一個對象,而是調用connect之後才傳回一個對象。
b.group(workerGroup);
=====================================================
這裡設定我們定義的事件循環組
b.channel(NioSocketChannel.class);
=====================================================
設定Channel類型。
說到這裡就不得不說Channel接口了。它是Netty最核心的接口。NioSocketChannel不過是Channel接口的其中一個子接口的一個實作罷了。類結構:
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel
public interface SocketChannel extends DuplexChannel
public interface DuplexChannel extends Channel
具體詳細資訊您就百度吧,我功力太淺,不能誤導大家。
b.option(ChannelOption.SO_KEEPALIVE, true);
=====================================================
設定一些套接字的選項。io.netty.channel.ChannelOption<T>類裡面有好多定義,太多了你就檢視文檔吧。
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())),
new ClientHandler());
}
});
=====================================================
實際上我們通過ChannelHandler來間接操縱Channel。 handler 方法裡的參數 ChannelHandler 是一個接口,在這裡的類結構是這樣的
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
public interface ChannelInboundHandler extends ChannelHandler
可以看出ChannelInitializer是用來初始化Channel的。再看ch.pipeline().addLast(...)方法,ch.pipeline()傳回一個 DefaultChannelPipeline 對象
public class DefaultChannelPipeline implements ChannelPipeline
然後DefaultChannelPipeline 調用addLast方法,把ChannelHandler的實作添加進來。然後按照順序執行,是以說addLast裡面的Handler是有順序的。
其中的ObjectEncoder和ObjectDecoder是系統為我們寫好的編碼解碼類
ChannelFuture f = b.connect(host, port).sync();
=====================================================
這個ChannelFuture又是個什麼東西呢。它是Future的子接口。連接配接後會傳回一個ChannelFuture對象
public interface ChannelFuture extends Future<Void>
Future的概念來自JUC,代表異步的意思。Netty是一個異步IO架構,命名為ChannelFuture代表他與Channel有關。
在Netty中,所有操作的都是異步的,意味着任何IO調用都會立即傳回,那麼如何擷取異步操作的結果?ChannelFuture就是為了結局這個問題而設計的,後面我們還會讨論
f.channel().closeFuture().sync();
=====================================================
一直等待,直到連結關閉
workerGroup.shutdownGracefully();
=====================================================
關閉事件線程組
Handler:
public class ClientHandler extends ChannelInboundHandlerAdapter
=====================================================
處理Channel隻需要繼承ChannelInboundHandlerAdapter這個類即可。你可以覆寫一些方法。
當有消息的時候會自動調用這個方法。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user = (User) msg;
System.out.println(user.toString());
}
當建立好連接配接自動調用這個。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture f = ctx.writeAndFlush(new User("Client", "I'm coming!"));
//f.addListener(ChannelFutureListener.CLOSE); //調用這個會關閉與伺服器之間的通道,導緻伺服器傳回的消息接收不到
// 下面是源碼
// ChannelFutureListener CLOSE = new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) {
// future.channel().close();//從這裡可以看出,當操作完成就自動關閉channel了,那伺服器傳回消息肯定收不到啊
// }
// };
//Channel channel()
//Returns a channel where the I/O operation associated with this future takes place.
}
Server:
public void run() throws Exception {
//伺服器需要兩個事件線程組,一個用來接收連接配接,一個用來處理連接配接
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 用來建立伺服器的Bootstrap
b.group(bossGroup, workerGroup) // 注冊線程組
.channel(NioServerSocketChannel.class) // 設定Channel類型
.childHandler(new ChannelInitializer<SocketChannel>() { // 初始化
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())),
new ServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // 選項
.childOption(ChannelOption.SO_KEEPALIVE, true); // 子選項
// 綁定端口,準備接受連接配接
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to
// gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();// 安全退出
bossGroup.shutdownGracefully();
}
}
handler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user = (User) msg; //因為我添加了解碼器,是以在這裡就直接強轉
System.out.println(user.toString());
ChannelFuture f = ctx.writeAndFlush(new User("Server", "Thank you for your coming!"));
f.addListener(ChannelFutureListener.CLOSE); //發送完回應消息,就關閉Channel
}
//異常處理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果有的地方不對,日後我會做修改
小LUA
面對敵人的嚴刑逼供,我一個字也沒說,而是一五一十寫了下來。