天天看點

IO, WebSocket ,netty實作用戶端,服務端案例

Netty是一個高性能、異步事件驅動的NIO架構,提供了對TCP、UDP和檔案傳輸的支援,作為一個異步NIO架構,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,使用者可以友善的主動擷取或者通過通知機制獲得IO操作結果。

在IO通信中,BIO一般是由一個獨立的accepter負責監聽用戶端的連結,它接受到用戶端的連結之後,對每一個用戶端都建立一個新的線程進行鍊路處理,處理完成之後通過輸出流傳回給應答給用戶端。此時線程銷毀,這就是典型的一請求一應答。但是缺點是當用戶端并發通路量增加以後,服務端的線程個數和用戶端的并發通路量成1:1正比,線程是java虛拟機很寶貴的資源,當線程膨脹之後,伺服器性能急劇下降,随着并發通路量的繼續增加,系統将會發生線程堆棧溢出,建立新線程溢出的情況,最後會導緻機器當機,不能對外提供服務。

僞異步IO通信,當有新的用戶端通信是将用戶端的socket封裝投遞到後端的線程池中進行處理,jdk它的線程池維護消息隊列和N個活躍的線程,對消息隊列中的任務進行相關的處理,當有M個用戶端請求的時候,服務端就會建立具有N個線程的線程池進行對用戶端的回應,由于線程池可以設定消息隊列的大小和最大線程數,是以它的資源占用是可控的,無論多少個用戶端通路都不會導緻伺服器的當機和資源耗盡。但是缺點是當有大量的用戶端接入的時候,随着并發通路量不斷的增加,僞異步IO通信會導緻線程的阻塞。

NIO核心組成:

1 、Buffer

2、Channel

3、Selector

緩沖區buffer它是一個對象,它包含一些要寫入或者讀出的資料,在NIO類庫中加入buffer對象展現了新庫和原IO的一個差別,在面向流的IO中可以将資料直接寫入或者直接讀出,在NIO中所有的資料都是通過NIO處理的,任何時候通路NIO都是通過緩沖區操作。

在NIO把它支援的I/O對象抽象為Channel,Channel又稱“通道”,類似于原I/O中的流(Stream),但有所差別:

1、流是單向的,通道是雙向的,可讀可寫。

2、流讀寫是阻塞的,通道可以異步讀寫。

3、流中的資料可以選擇性的先讀到緩存中,通道的資料總是要先讀到一個緩存中,或從緩存中寫入。

在NIO中多路複用器Selector,Selector會不斷的輪詢Channel,如果某個Channel上發生讀或者寫的操作,這個Channel就會處于就緒狀态,會被Selector輪詢出來,然後通過SelectorsKey可以獲得就緒Channel的集合進行後續的IO操作,而jdk使用了其他方法代替了Selector,是以并沒有最大連結數的限制,可以接入更多用戶端。

AIO通信,它是連結注冊讀寫事件和回調函數,讀寫方法異步,同時它是主動通知程式的,AIO通過二種方式擷取結果,第一種就是通過filter來擷取結果,第二種就是通過header的實作類來完成操作的回調,AIO是真正的非阻塞異步IO,它不需要多路複用器Selector對注冊的通道進行輪詢操作即可實作異步讀寫,進而簡化了NIO。

什麼是webSocket?

是H5提出的一個協定規範,webSocket通過一個握手機制用戶端和伺服器就能建立一個tcp,進而友善用戶端和伺服器端的通信, 在webSocket出現之前,一般都是使用HTTP長連結或者短連接配接。

webSocket是為了解決用戶端和伺服器端實時通信二産生的技術,webSocket本質上是一個基于tcp協定,先通過一個HTTP或HTTPS發起一條特殊的請求,進行握手後建立一個交換資料的tcp連結,此後呢,用戶端和服務端通過這個tcp協定進行實時的通信。

注意,當webSocket的用戶端和伺服器端進行通信以後,此時就不需要之前進行握手請求的HTTP協定參與。

webSocker的優點:

1、節省通信開銷

以前的webServer實作推送和及時通信技術都是使用輪尋,在特定的時間間隔,比如一秒鐘,由流量器主動發起請求,将伺服器的消息主動啦回來,在這種情況下,我們需要不斷的向伺服器發起請求,而HTTP request header一般都是很長的,裡面包含的資料可能隻是一個很小的值,這樣會占用很多的伺服器資源。

2、伺服器主動傳送資料給用戶端,用戶端也可以随時發送資料給伺服器,用戶端交換的header資訊也是很小的。

3、實時通信,webSocker不隻隻僅限于ajax請求,而webSocker可以彼此發送資訊,進而實時通信。

netty實作WebSocket通信的案例:

以下是server和client的示例代碼,其中使用的是 Netty 4.x,先看看如何實作,後續會針對各個子產品進行深入分析。

server 代碼實作

public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(port).sync(); // (5)

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new EchoServer(port).run();
    }
}           

EchoServerHandler 實作

public class EchoServerHandler extends ChannelInboundHandlerAdapter {  
  
    private static final Logger logger = Logger.getLogger(  
            EchoServerHandler.class.getName());  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}           

1、NioEventLoopGroup 是用來處理I/O操作的線程池,Netty對 EventLoopGroup 接口針對不同的傳輸協定提供了不同的實作。在本例子中,需要執行個體化兩個NioEventLoopGroup,通常第一個稱為“boss”,用來accept用戶端連接配接,另一個稱為“worker”,處理用戶端資料的讀寫操作。

2、ServerBootstrap 是啟動服務的輔助類,有關socket的參數可以通過ServerBootstrap進行設定。

3、這裡指定NioServerSocketChannel類初始化channel用來接受用戶端請求。

4、通常會為新SocketChannel通過添加一些handler,來設定ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline添加指定handler。

5、通過綁定端口(自定義),就可以對外提供服務了。

client 代碼實作

public class EchoClient {  
  
    private final String host;  
    private final int port;  
    private final int firstMessageSize;  
  
    public EchoClient(String host, int port, int firstMessageSize) {  
        this.host = host;  
        this.port = port;  
        this.firstMessageSize = firstMessageSize;  
    }  
  
    public void run() throws Exception {  
        // Configure the client.  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap b = new Bootstrap();  
            b.group(group)  
             .channel(NioSocketChannel.class)  
             .option(ChannelOption.TCP_NODELAY, true)  
             .handler(new ChannelInitializer<SocketChannel>() {  
                 @Override  
                 public void initChannel(SocketChannel ch) throws Exception {  
                     ch.pipeline().addLast(  
                             //new LoggingHandler(LogLevel.INFO),  
                             new EchoClientHandler(firstMessageSize));  
                 }  
             });  
  
            // Start the client.  
            ChannelFuture f = b.connect(host, port).sync();  
  
            // Wait until the connection is closed.  
            f.channel().closeFuture().sync();  
        } finally {  
            // Shut down the event loop to terminate all threads.  
            group.shutdownGracefully();  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        final String host = args[0];  
        final int port = Integer.parseInt(args[1]);  
        final int firstMessageSize;  
        if (args.length == 3) {  
            firstMessageSize = Integer.parseInt(args[2]);  
        } else {  
            firstMessageSize = 256;  
        }  
  
        new EchoClient(host, port, firstMessageSize).run();  
    }  
}           

EchoClientHandler 實作

public class EchoClientHandler extends ChannelInboundHandlerAdapter {  
  
    private static final Logger logger = Logger.getLogger(  
            EchoClientHandler.class.getName());  
  
    private final ByteBuf firstMessage;  
  
    /** 
     * Creates a client-side handler. 
     */  
    public EchoClientHandler(int firstMessageSize) {  
        if (firstMessageSize <= 0) {  
            throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);  
        }  
        firstMessage = Unpooled.buffer(firstMessageSize);  
        for (int i = 0; i < firstMessage.capacity(); i ++) {  
            firstMessage.writeByte((byte) i);  
        }  
    }  
  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) {  
        ctx.writeAndFlush(firstMessage);  
    }  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
       ctx.flush();  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}