天天看點

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

1.Netty 介紹和應用場景

1.1 介紹

  1. Netty 是jboss的一個開源架構
  2. Netty是一個異步的,基于事件驅動的網絡應用架構
  3. 基于nio

1.2 應用場景

  1. Rpc 例如dubbo
  2. 遊戲
  3. 大資料

涉及到網絡通信的應用都可以使用netty

2. i/o模型

2.1 介紹

  1. bio 同步并阻塞 一個連接配接對應伺服器一個線程   适用于連接配接數較少的架構 jdk1.4
  2. nio 同步非阻塞 伺服器一個線程處理多個連接配接   适用于連接配接數較多連接配接時間短 jdk1.4
  3. aio(nio.2) 異步非阻塞  适用于連接配接數多且連接配接時間長 jdk1.7

2.2 bio

blocking i/o

2.2.1 簡單demo

開發一個服務端,建立一個線程池,當用戶端發送一個請求,服務端對應建立一個線程處理,當有多個用戶端請求時,就會建立多個線程對應處理

這裡demo的用戶端用telnet模拟

public static void handler(Socket socket){

        try(InputStream in = socket.getInputStream();){

            System.out.println("線程資訊: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());

            byte[] bytes = new byte[1024];

            while (true){

                int read = in.read(bytes);

                if(read!=-1){

                    System.out.println("輸出資訊: "+new String(bytes,"UTF-8"));

                }else {

                    break;

                }

            }

        }catch (IOException e){

            e.printStackTrace();

        }

    }

    public static void main(String[] args) {

        try(ServerSocket serverSocket = new ServerSocket(6666);) {

            ExecutorService executorService = Executors.newCachedThreadPool();

                System.out.println("等待連結");

                final Socket socket = serverSocket.accept();

                System.out.println("連結到一個用戶端");

                executorService.execute(new Runnable() {

                    @Override

                    public void run() {

                        System.out.println("線程資訊: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());

                        handler(socket);

                    }

                });

        } catch (IOException e) {

2.3 nio

non-blocking i/o 非阻塞

2.3.1 簡介

三大核心

  1. channel 通道
  2. buffer 緩沖區
  3. selector 選擇器

簡述操作原理: selector 選擇可用的channel, channel 與 buffer可以互相讀寫,應用程式并不直接對channel進行操作,而是通過對buffer進行操作,間接操作channel

一個線程中會有多個selector,一個selector中可以注冊多個channel,如果并沒有資料傳輸,線程還可以做其他事,并不會一直等待

2.4 nio與bio 的差別

  1. nio非阻塞 bio阻塞
  2. nio用塊的方式處理io  bio用流的方式處理io 塊的方式比流的方式要快
  3. bio基于位元組流/字元流  nio基于緩沖區和通道(channel) selector監聽多個通道的事件,是以用一個線程就可以處理多個通道的資料

圖示: nio

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

bio

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

3. nio詳解

3.1 nio模型三大元件的關系

  1. 一個線程對應一個selector
  2. 一個selecor對應多個channel
  3. 一個channel對應一個buffer
  4. 一個線程對應多個channel
  5. channel與buffer都是雙向的,就是既可以讀也可以寫 使用flip()方法切換
  6. buffer就是一個記憶體塊,讀寫記憶體比較快
  7. selector會根據不同僚件切換不同的channel

3.2 Buffer緩沖區

3.2.1 簡介

本質是一個讀寫資料的記憶體塊,可以了解成一個提供了操作記憶體塊方法的容器對象(數組)

緩沖區中内置了一些機制,這些機制可以檢測到緩沖區的資料變化,狀态變化

channel讀寫的資料必須都經過Buffer

3.2.2 源碼分析

常用的幾個操作方法

public static void main(String[] args) {

        //allocate 規定intbuffer的長度

        IntBuffer buffer = IntBuffer.allocate(5);

        //capacity()擷取容量

        //put()寫入

        for(int i = 0;i<buffer.capacity();i++){

            buffer.put(i*2);

        //flip()反轉 由寫轉為讀

        buffer.flip();

        //讀取

        //get()每次讀取後 索引向後移動一位

            System.out.println(buffer.get());

3.2.2.1 定義

IntBuffer中定義了一個int數組,其他類型的buffer類似

public abstract class IntBuffer

    extends Buffer

    implements Comparable<IntBuffer>

{

    // These fields are declared here rather than in Heap-X-Buffer in order to

    // reduce the number of virtual method invocations needed to access these

    // values, which is especially costly when coding small buffers.

    //

    final int[] hb;                  // Non-null only for heap buffers

    final int offset;

    boolean isReadOnly;                 // Valid only for heap buffers

最頂層的Buffer類中定義了四個屬性

public abstract class Buffer {

    /**

     * The characteristics of Spliterators that traverse and split elements

     * maintained in Buffers.

     */

    static final int SPLITERATOR_CHARACTERISTICS =

        Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;

    // Invariants: mark <= position <= limit <= capacity

    private int mark = -1; //标記

    private int position = 0; //目前索引的位置,不能超過limit

    private int limit;//最大能讀寫的長度

    private int capacity;//容量 allocate定義的長度

3.2.2.2 反轉

   public final Buffer flip() {

        limit = position;

        position = 0;

        mark = -1;

        return this;

可以看到,反轉之後,由讀變為寫,或者由寫變為讀

将索引歸0,最大讀寫長度不能超過上次操作的索引

3.2 channel 通道

  1. 通道類似于流/連接配接,但是流隻能寫入或者讀取,通道可以即讀取也寫入
  2. 通道異步讀寫資料
  3. 通道可以讀寫資料到緩存區

3.2.2 層級關系

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

當有用戶端發送請求時,服務端會建立一個ServerSocketChannel(實作類:ServerSocketChannelImpl) 再由ServerSocketChannel建立一個SocketChannel(實作類:SocketChannelImpl即真正讀寫資料的通道),這個SocketChannel就是與這個用戶端請求所對應的

3.2.3 案例剖析

3.2.3.1 FileChannle 輸出檔案流

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;

/**

 * @author: zhangyao

 * @create:2020-08-25 14:50

 **/

public class FileChannelTest {

        FileOutputStream fileOutputStream = null;

        try {

            //檔案輸出流

            fileOutputStream = new FileOutputStream("D:\\file01.txt");

   //檔案輸出流包裝為FileChannel 此處FileChannel預設實作FileChannelImpl

            FileChannel fileChannel = fileOutputStream.getChannel();

            //建立對應的緩沖區

            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            //資料寫入緩沖區

            byteBuffer.put("hello nio".getBytes());

            //反轉,因為接下來需要從緩沖區讀取資料寫入Channel

            byteBuffer.flip();

            //從緩沖區寫入Channel

            fileChannel.write(byteBuffer);

        } catch (FileNotFoundException e) {

        } finally {

            //關閉檔案流

            if(fileOutputStream!=null){

                try {

                    fileOutputStream.close();

                } catch (IOException e) {

                    e.printStackTrace();

}

整體流程就是把資料寫入緩沖區,在讀取緩存區寫入通道Channel,在由檔案輸出流輸出

圖示如下

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

3.2.3.2 FileChanle 輸入檔案流

    FileInputStream fileInputStream = null;

    try {

        fileInputStream = new FileInputStream("D:\\file01.txt");

        //擷取Channel

        FileChannel channel = fileInputStream.getChannel();

        //建立byteBuffer

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //從channel中讀取資料寫入buffer

        channel.read(byteBuffer);

        //反轉  下一步需要從buffer中讀取資料輸出

        byteBuffer.flip();

        //輸出

        byte[] array = byteBuffer.array();

        System.out.println(new String(array));

    } catch (FileNotFoundException e) {

        e.printStackTrace();

    } catch (IOException e) {

    } finally {

            fileInputStream.close();

與上面的例子剛好相反,從檔案中讀取資料,通過通道寫入buffer緩沖區,在輸出

圖示

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

3.2.3.3 FileChannel 拷貝檔案

其實就是上面兩個例子結合,把一個檔案中的資料複制到另外一個檔案中

    FileOutputStream fileOutputStream = null;

        fileOutputStream = new FileOutputStream("D:\\file02.txt");

        FileChannel channel1 = fileOutputStream.getChannel();

        while (true){

            //将byteBuffer複位

            byteBuffer.clear();

            int read = channel.read(byteBuffer);

            if(read==-1){

                break;

            channel1.write(byteBuffer);

            fileOutputStream.close();

這裡使用了byteBuffer.clear()方法

因為ByteBuffer緩沖區是有長度的,當讀取的檔案超過緩沖區的長度時,如果不對緩沖區進行清空,當進行下一次讀取時,就會從上一次讀取的位置開始讀取,會出現死循環的情況

3.2.3.4 FileChannel 拷貝檔案之TransferFrom

        //從channel通道拷貝到 channel1通道

        channel1.transferFrom(channel, 0, channel.size());

3.2.4 Buffer的分散和聚集

上面的例子都是使用單個buffer進行資料的讀寫,如果資料過大,也可用使用多個buffer(buffer數組)進行資料的讀寫,即用空間換時間

3.3 Selector選擇器

3.3.1 基本簡介

一個selector管理多個channel通道,使用異步的方式處理io

隻有讀寫真正的發生時,才會處理資料,減小了線程的壓力,不用每個請求都維護一個線程

避免了多線程之間的上下文切換導緻的開銷

3.3.2 selector的api

Selector:

  1. select() 阻塞
  2. select(Long timeout) 有逾時時間
  3. selectNow() 非阻塞
  4. wakeup() 立即喚醒selector

3.3.2 selecor的工作流程

其實是Selector SelectionKey ServerSocketChannel SorkcetChannel的工作原理

  1. 當用戶端連結時,通過ServerSockertChannel 得到SocketChannel 并且注冊到 Selector上
    1. 注冊源碼

public abstract SelectionKey register(Selector sel, int ops)

    throws ClosedChannelException;

這是SocketChannel注冊到Selector上的方法,第一個參數為要注冊的Selector對象,第二個參數為事件驅動的類型

public abstract class SelectionKey {

    public static final int OP_READ = 1;

    public static final int OP_WRITE = 4;

    public static final int OP_CONNECT = 8;

    public static final int OP_ACCEPT = 16;

    private volatile Object attachment = null;

  1. 當注冊完成後傳回一個Selectionkey,這個selectionKey會和SocketChannel關聯
  2. Selector通過select方法監聽Channel,如果有事件發生,傳回對應的selectionKey集合
    1. 源碼

public int select(long var1) throws IOException {

        if (var1 < 0L) {

            throw new IllegalArgumentException("Negative timeout");

        } else {

            return this.lockAndDoSelect(var1 == 0L ? -1L : var1);

    public int select() throws IOException {

        return this.select(0L);

    public int selectNow() throws IOException {

        return this.lockAndDoSelect(0L);

 private int lockAndDoSelect(long var1) throws IOException {

        synchronized(this) {

            if (!this.isOpen()) {

                throw new ClosedSelectorException();

            } else {

                int var10000;

                synchronized(this.publicKeys) {

                    synchronized(this.publicSelectedKeys) {

                        var10000 = this.doSelect(var1);

                return var10000;

  1. 通過得到的selectionKey可以反向擷取Channel

    public abstract SelectableChannel channel();

  1. 最後通過channel處理業務

3.3.3 案例

服務端思路:

  1. 建立serverSocketChannel綁定端口6666,把這個channel注冊到Selector上,注冊事件是OP_ACCEPT
  2. 循環監聽,判斷是否channel中是否有事件發生,如果有事件發生,判斷不同的事件類型進行不同的連結,讀/寫操作

用戶端思路

  1. 建立一個SocketChannel,連接配接上伺服器之後,發送消息,并保持連結不關閉

3.3.3.1 server端

import java.net.InetSocketAddress;

import java.nio.channels.*;

import java.util.Iterator;

import java.util.Set;

 * @create:2020-08-26 16:55

public class ServerChannel {

            //生成一個ServerScoketChannel

            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

            //設定為非阻塞的

            serverSocketChannel.configureBlocking(false);

            //serverSocket監聽6666端口

            serverSocketChannel.socket().bind(new InetSocketAddress(6666));

            //建立Selector

            Selector selector = Selector.open();

            //serverSocketChannel注冊到Selector

            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            //循環等待連結

                //如果沒有事件發生,就繼續循環

                if(selector.select(1000) == 0){

                    System.out.println("等待1s,無連接配接");

                    continue;

                //如果有事件驅動,就需要周遊事件

                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> iterator = selectionKeys.iterator();

                while (iterator.hasNext()){

                    SelectionKey key = iterator.next();

                    //如果事件是連接配接

                    if(key.isAcceptable()){

                        try {

                            SocketChannel channel = serverSocketChannel.accept();

                            channel.configureBlocking(false);

                            SelectionKey register = channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                            System.out.println("連結成功");

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    //如果是讀取資料

                    if(key.isReadable()){

                        SocketChannel channel = (SocketChannel) key.channel();

                        ByteBuffer byteBuffer = (ByteBuffer) key.attachment();

                            int read = channel.read(byteBuffer);

                            byte[] array = byteBuffer.array();

                            System.out.println("讀取資料:"+ new String(byteBuffer.array()));

                    iterator.remove();

                };

3.3.3.2 用戶端

import java.nio.channels.SocketChannel;

 * @create:2020-08-26 17:24

public class ClientChannel {

        //建立一個SocketChannel

            SocketChannel socketChannel = SocketChannel.open();

            socketChannel.configureBlocking(false);

            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

            if(!socketChannel.connect(inetSocketAddress)){

                while (!socketChannel.finishConnect()){

                    System.out.println("伺服器連接配接中,線程并不阻塞,可以進行其他操作");

            //連接配接成功

            socketChannel.write(ByteBuffer.wrap("hello ,server".getBytes()));

            System.in.read();

4.Netty

4.1 簡介

https://netty.io/

官網

netty是對java nio api的封裝,簡化了nio程式的開發,jdk要求最低1.6

流行的網絡程式設計通信架構,Dubbo Elasticsearch 等架構底層的網絡通信架構都是 Netty

架構模型

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

版本

netty 共有 3.x 4.x 5.x三個大版本

3.x較老,5.x有重大bug,被官網廢棄  現在主要使用4.x

4.2 線程模型

有以下幾種線程模型

4.2.1 傳統I/O阻塞模型

每一個連結都需要一個對應的線程進行處理,并且當連結建立後,如果目前連結沒有資料傳輸時,此線程會被阻塞在read()方法

4.2.2 Reactor模式

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

原理圖示如上

主要是針對了傳統I/O模型一個連接配接會阻塞一個線程的問題進行了改進,當連接配接建立後都通過ServiceHandler調用線程池中的線程進行處理,這樣就隻用阻塞一個ServiceHandler線程,達到多路複用的目的

Reactor模式有三種實作方式

4.2.2.1單Reactor單線程

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

使用一個線程通過多路複用完成所有操作,包括讀寫連接配接

redis使用的就是這種模型 單線程

4.2.2.2單Reactor多線程

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

相對于單Reactor單線程,主線程不在進行業務處理,當有請求過來之後,具體的業務處理交與線程池中的線程處理,線程處理完成後再通過handler傳回給Client

4.2.2.3 主從Reactor多線程

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

相比于單Reacotr,主從Reactor将Reactor分為MainReactor和SubReactor

MainReactor中負責分發和連接配接

SubReactor中負責讀寫

一個MainReactor可以對應多個SubReactor

4.2.3 Netty模型

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

簡述Netty模型

  1. 角色
    1. BossGroup BossGroup的類型是NioEventLoopGroup,其中包含了很多NioEventLoop
    2. NioEventLoop nio事件循環,每個NioEventLoop中都有一個Selctor和一個任務隊列
    3. WorkerGroup 類型是NioEventLoopGroup,與BossGroup類似,隻不過功能不同,BossGroup隻負責與用戶端建立連接配接, WorkerGroup需要讀寫,處理業務
    4. PipeLine 管道,對Channel進行的封裝,具體的業務處理是通過Pipline對Channel進行處理
  1. 具體流程
    1. 當用戶端發送請求時,首先進入BossGroup,有NioEventLoop對請求進行事件輪詢,如果是連接配接事件就進行處理
      1. 處理的步驟分為三步
      2. 輪詢
      3. 注冊 這裡的注冊指的是将生成的SocketChannel注冊到workerGroup中的某個NioEventLoop中的Selector上
      4. 執行任務清單
    1. 當請求的事件是讀寫時,就有workerGroup對請求進行具體的業務處理
      1. 處理的步驟BossGroup類似
  1. 總結

    由此可以看出,Netty的模型與主從Reactor模型類似,都是由一個主Reactor負責連接配接事件,由一個從Reactor負責讀寫事件

4.2.4 案例demo

4.2.4.1 服務端

4.2.4.1.1 NettyServer

package netty;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

 * @create:2020-09-03 08:55

public class NettyServer {

        //建立BossGroup 和 WorkerGroup

        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        ChannelFuture channelFuture = null;

        serverBootstrap.group(bossGroup, workerGroup)

                .channel(NioServerSocketChannel.class)

                .option(ChannelOption.SO_BACKLOG, 128)

                .childOption(ChannelOption.SO_KEEPALIVE, true)

                .childHandler(new ChannelInitializer<SocketChannel>() {

                    protected void initChannel(SocketChannel socketChannel) {

                        socketChannel.pipeline().addLast(new NettyServerHandler());

        System.out.println("伺服器就緒.....");

        //綁定端口

            channelFuture = serverBootstrap.bind(6668).sync();

        }catch (Exception e){

        }finally {

            try {

                channelFuture.channel().closeFuture().sync();

                bossGroup.shutdownGracefully();

                workerGroup.shutdownGracefully();

            } catch (InterruptedException e) {

                e.printStackTrace();

4.2.4.1.2 NettyServerHandler

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.CharsetUtil;

 * @create:2020-09-03 09:12

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //讀取資料

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;

        System.out.println("用戶端發送消息:"+ buf.toString(CharsetUtil.UTF_8));

        System.out.println("用戶端位址:"+ ctx.channel().remoteAddress());

    //資料讀取完畢

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 用戶端",CharsetUtil.UTF_8));

    //處理異常 關閉ctx

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        ctx.close();

4.2.4.2 用戶端

4.2.4.2.1 NettyClient

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.socket.nio.NioSocketChannel;

 * @create:2020-09-03 09:52

public class NettyClient {

        EventLoopGroup executors = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(executors)

                    .channel(NioSocketChannel.class)

                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        protected void initChannel(SocketChannel socketChannel) throws Exception {

                            socketChannel.pipeline().addLast(new NettyClientHandler());

                    });

            System.out.println("用戶端就緒........");

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

            //關閉通道

            channelFuture.channel().closeFuture().sync();

                executors.shutdownGracefully().sync();

4.2.4.2.2 NettyClientHandler

 * @create:2020-09-03 10:00

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //就緒時觸發

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("ctx: "+ctx);

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服務端", CharsetUtil.UTF_8));

    //讀取資訊

    //這裡讀取的是伺服器傳回的資訊

        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println("服務端發送消息: "+ byteBuf.toString(CharsetUtil.UTF_8));

        System.out.println("服務端位址: "+ ctx.channel().remoteAddress());

    //異常處理

一個簡單的TCP服務通信,用戶端發送消息,服務端接收消息并傳回消息給用戶端

4.2.4.3 案例demo源碼分析

4.2.4.3.1 NioEventGroup

public NioEventLoopGroup() {

    this(0);

public NioEventLoopGroup(int nThreads) {

    this(nThreads, (Executor)null);

可以看到,如果使用無參的NioEventGroup,預設傳遞的是0,也可以指定線程數

一層一層找下去:

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //NettyRuntime.availableProcessors()擷取目前計算機的核數(邏輯處理器)

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {

    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

發現最後找到NioEventGroup父類的方法

如果指定了NioEventGroup的線程數,且不為0的時候,就使用指定的線程數

否則,**就使用目前計算機的核數2作為線程數

debug 看結果

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

電腦是12核,預設是24個線程

指定一個線程

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty
Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

就隻有一個線程數

4.2.5 異步模型

上文中的案例Demo中的 ChannelFuture

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

異步模型Future是相對與同步來說的

異步指的是當一個異步調用發出後,不會立刻得到結果,而是通過回調,狀态來通知調用者調用的結果

Netty 中的 connect 和 bind() sync方法就是傳回了一個異步的結果,之後再通過監聽擷取到結果

也就是 Future-Listener機制

當Future對象剛建立的時候,處于未完成的狀态,可以通過傳回的ChannelFuture檢視操作執行的狀态,也可以注冊監聽函數來執行完成後的操作

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

isSucess()是否成功

isDone()是否完成

isCancelable() 是否取消

cause() 失敗原因

addListener 增加監聽器

//綁定端口

            channelFuture.addListener(new ChannelFutureListener() {

                @Override

                public void operationComplete(ChannelFuture channelFuture) throws Exception {

                    if(channelFuture.isSuccess()){

                        System.out.println("監聽端口成功");

                    }else {

                        System.out.println("監聽端口失敗");

            });

4.2.6 Netty Http服務

做一個簡單的demo  浏覽器(用戶端)通路伺服器端7001端口,傳回一個字元串資訊

浏覽器通路是http請求,伺服器也需要相應一個httpResponse

4.2.6.1 服務端

NettyHttpServer 啟動類

package netty.http;

 * @create:2020-09-04 11:16

public class NettyHttpServer {

    public static void main(String[] args) throws InterruptedException {

        serverBootstrap.group(bossGroup,workerGroup)

                .childHandler(new NettyHttpInitialize());

        ChannelFuture channelFuture = serverBootstrap.bind(7001).sync();

        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();

        workerGroup.shutdownGracefully();

NettyHttpInitialize 處理器類

對之前的ChannelInitialize(SocketChannel)進行封裝

import io.netty.channel.Channel;

import io.netty.channel.ChannelPipeline;

import io.netty.handler.codec.http.HttpServerCodec;

 * @create:2020-09-04 11:21

public class NettyHttpInitialize extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel sc) throws Exception {

        //得到管道

        ChannelPipeline pipeline = sc.pipeline();

        //管道中加入處理器 主要是處理Http請求的,解析請求頭之類的

        pipeline.addLast("myDecoder",new HttpServerCodec());

        //加入處理器

        pipeline.addLast("myServerHandler",new NettyServerHandler());

NettyServerHandler 具體的處理(傳回http響應)

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.*;

import org.springframework.http.HttpStatus;

 * @create:2020-09-04 14:01

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    //讀資訊

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {

        System.out.println(httpObject);

        System.out.println("用戶端位址+"+ channelHandlerContext.channel().remoteAddress());

        ByteBuf byteBuf = Unpooled.copiedBuffer("hello , im 服務端", CharsetUtil.UTF_8);

        //傳回用戶端資訊

        FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,byteBuf);

        fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");

        fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());

        channelHandlerContext.writeAndFlush(fullHttpResponse);

出現的問題記錄:

  1. 第一次綁定6668端口,浏覽器通路失敗,換成7001就可以了

    原因: 谷歌浏覽器禁用了6665-6669以及其他一些不安全的端口

  2. 通路後第一次請求 出現異常,可以正常傳回資料
    Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

    原因: NettyServerHandler中沒有對異常處理方法進行重寫

    加上這部分就可以了,報錯資訊也報的很明顯

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

    ctx.close();

4.2.6.2 Http服務的過濾

可以對一些不希望處理的請求進行過濾,其實就是在對http請求的處理過程中判斷一下請求的uri

在上文中 NettyServerHandler類中 加入以下代碼,即可攔截/favicon.ico請求

HttpRequest re  = (HttpRequest) httpObject;

String uri = re.uri();

if(uri.equals("/favicon.ico")){

    System.out.println("不想處理,傳回");

    return;

4.3 Netty API梳理

基于上述的各種demo,對Netty常用的類和方法進行系統梳理

4.3.1 Bootstrap

  1. ServerBootstrap 服務端啟動引導類

    BootStrap 用戶端啟動引導類

    1. .group() 給BootStrap設定NioEventLoopGroup,可以設定多個
    2. .channel() 設定服務使用的通道類
    3. .option() 設定通道參數
    4. .handler() 對BossGroup進行設定
    5. .childrenHandler() 對workerGroup 進行設定
    6. .bind() 服務端綁定一個端口号,監聽端口
    7. connect() 用戶端用于連接配接服務端

4.3.2 Future

Netty中的io操作都是異步的,也就是不能立刻傳回結果,而是當完成了之後再通知調用者

  1. Future
  2. ChannelFutrue

    方法

    1. channel() 傳回目前正在進行IO操作的通道
    2. sync() 轉為同步,等待異步操作完畢

4.3.3 Channel

不同協定,不同阻塞類型都有對應的Channel

  1. NioSocketChannel 異步tcp協定Socket連接配接
  2. NioServerSocketChannel 異步tcp協定服務端連接配接
  3. NioDatagramChannel 異步udp連接配接
  4. NioSctpChannel 異步sctp用戶端連接配接
  5. NioSctpServerChannel 異步sctp服務端連接配接

4.3.4 Selector

Netty 基于Nio Selector對象實作多路複用,一個selector管理多個channel

4.3.5 ChannelHandler

主要是用于對資料的處理,其中有很多封裝好的方法,使用的時候繼承其子類即可

實作類

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

子類很多,常用的幾個

channelHandler

  1. ChannelInboundHandler
  2. ChannelOutboundHandler
  3. 擴充卡
    1. channelInboundHandlerAdapter
    2. channelOutboundHandlerAdapter

4.3.6 pipeline

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

結構圖如上

channel中可以建立出一個ChannelPipeline, ChannelPipeline中有維護了一個由ChannelHandlerContext組成的雙向連結清單

每個ChannelHandlerContext又對應了一個Channelhandler

常用方法:

addFirst(); 添加一個Handler到連結清單中的第一個位置

addLast(); 添加到連結清單的最後一個位置

4.3.7 channelHandlerContext

每一個channelhandlerContext包含了一個channelHandler(業務處理)

channelHandlerContext中還可以擷取到對應的channel和pipeline資訊

channelHandlerContext.channel();

channelHandlerCOntext.pipeline();

4.3.8 EventLoopGroup

netty一般提供兩個EventLoopGroup BossEventLoopGroup 和 workerEventLoopGroup

EventLoopGroup可以指定使用的核心是多少個

4.3.9 Unplooed

Netty提供的用來操作緩沖區資料的工具類

copiedBuffer(); 傳回的是Netty提供的 Bytebuf對象

4.3.10 ByteBuf

Netty的資料容器(緩沖區)

可以直接進行讀/寫 讀寫之間不需要進行flip(),原因是ByteBuf内部維護了兩個索引 readIndex writeIndex

常用方法

getByte()

readByte()

writeByte()

capacity()

4.4 Netty 心跳檢測機制

當用戶端長時間沒有讀/寫操作時,服務端需要檢測用戶端是否還處于連接配接狀态,也就是心跳檢測

Netty提供了心跳檢測的處理類  IdleStateHandler

示例代碼

package netty.hearbeat;

import io.netty.channel.socket.ServerSocketChannel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

 * @create:2020-09-10 10:04

public class MyServer {

    public static void main(String[] args) throws Exception{

        ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)

                .handler(new LoggingHandler())

                    protected void initChannel(SocketChannel socketChannel) throws Exception {

                        ChannelPipeline pipeline = socketChannel.pipeline();

                        pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));

                        pipeline.addLast("inleHandler",new MyHearBeatHandler());

        ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();

import io.netty.channel.ChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

 * @create:2020-09-10 16:50

public class MyHearBeatHandler extends ChannelInboundHandlerAdapter {

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

        if(o instanceof IdleStateEvent){

            IdleStateEvent event = (IdleStateEvent) o;

            IdleState state = event.state();

            switch (state){

                case READER_IDLE:

                    System.out.println("讀空閑");

                case WRITER_IDLE:

                    System.out.println("寫空閑");

                case ALL_IDLE:

                    System.out.println("讀寫空閑");

用戶端沒有差別

4.5 Netty 之 webSocket

使用netty編寫webSocket長連接配接

4.5.1 服務端

package netty.websocket;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

 * @create:2020-09-11 14:43

                        //添加http解碼器

                        pipeline.addLast(new HttpServerCodec());

                        //添加塊傳輸處理器

                        pipeline.addLast(new ChunkedWriteHandler());

                        //http分段傳輸,增加一個聚合處理

                        pipeline.addLast(new HttpObjectAggregator(8192));

                        //增加websocket協定處理

                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                        //增加自定義處理業務處理器

                        pipeline.addLast(new MyServerHandler());

        ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

4.5.2 服務端自定義的處理器

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDate;

import java.time.LocalDateTime;

import java.util.Date;

 * @create:2020-09-11 14:49

public class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        //當用戶端關閉連接配接

        System.out.println("用戶端關閉連接配接..."+ ctx.channel().id());

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        //當用戶端連接配接到服務端

        System.out.println("有用戶端連接配接到服務端 id為" + ctx.channel().id());

        //異常關閉連接配接

    //收到消息

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

        //讀取消息并傳回相同的消息傳回給用戶端

        String text = textWebSocketFrame.text();

        System.out.println("服務端收到消息" + text);

        //傳回給用戶端

        Channel channel = channelHandlerContext.channel();

        channel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now()+"  服務端傳回消息:" + text));

4.5.3 頁面(webSocket用戶端)

<!DOCTYPE html>

<html lang="en">

<head>

    <meta charset="UTF-8">

    <title>測試netty+webSocket</title>

</head>

<body>

    <form onsubmit="return false">

        <textarea id="sendMessage" style="height: 300px;width: 300px" placeholder="請輸入要發送的消息"></textarea>

        <button onclick="send(document.getElementById('sendMessage').value)">發送消息</button>

        <textarea id="responseMessage" style="height: 300px;width: 300px" ></textarea>

        <button onclick="document.getElementById('responseMessage').value=''">清空消息</button>

    </form>

</body>

<script type="application/javascript">

    var websocket;

    if(!window.WebSocket){

        alert("浏覽器不支援webSocket")

    }else {

        //進行webSocket的開啟 關閉

        websocket = new WebSocket("ws://localhost:7000/hello");

        //webSocket 開啟事件

        //給消息傳回框加入一條資料

        websocket.onopen = function (ev) {

            document.getElementById("responseMessage").value = '連接配接到服務端';

        websocket.onclose = function (ev) {

            document.getElementById("responseMessage").value += '\n 連接配接關閉';

        //當服務端響應消息時觸發  将服務端傳回的消息回顯緻文本框

        websocket.onmessage = function (ev) {

            document.getElementById("responseMessage").value += '\n ' ;

            document.getElementById("responseMessage").value += ev.data ;

    //發送消息

    function send (message) {

        if(!window.websocket){

            alert("socket還未初始化完成");

            return;

        if(websocket.readyState == WebSocket.OPEN){

            websocket.send(message);

            document.getElementById('sendMessage').value=''

</script>

</html>

4.6 Netty 編碼解碼

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

網絡傳輸過程中的編碼解碼過程

codec 編解碼器 包含 encoder編碼器 和 decoder解碼器

netty中提供了一些StringCodec ObjectCodec的編解碼器,但是這些編解碼器還是依賴java底層的序列化技術,java底層的序列化是比較低效的,是以需要引入新的高效的序列化技術

4.6.1 ProtoBuf

4.6.2 自定義編碼解碼器

package netty.inboundAndOutbound;

import netty.inboundAndOutbound.client.MyClientMessageToByteHandler;

 * @create:2020-09-18 14:53

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {

        ChannelPipeline pipeline = socketChannel.pipeline();

        //添加入棧解碼器

        pipeline.addLast(new ByteToMessageHandler());

        //添加出棧編碼器

        pipeline.addLast(new MyClientMessageToByteHandler());

        //添加自定義處理器

        pipeline.addLast(new MyServerHandler());

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

 * @create:2020-09-18 14:54

public class ByteToMessageHandler extends ByteToMessageDecoder {

    //自定義實作的入棧解碼器

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        if(byteBuf.readableBytes()>=8){

            list.add(byteBuf.readLong());

package netty.inboundAndOutbound.client;

 * @create:2020-09-18 16:16

public class MyClientByteToLong extends ByteToMessageDecoder {

4.6.3 handler處理機制及出棧入棧

通過上面的編碼解碼進而延申到handler的處理機制

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

簡單的netty出棧入棧的解釋圖

出棧和入棧是相對于而言的,當用戶端發送消息到達服務端,對于用戶端來說就是出棧,對于服務端來說就是入棧,反之亦然

4.7 Tcp沾包 拆包

4.7.1 沾包拆包介紹

tcp服務再發送消息的時候,如果發送的多個包.資料量小且包的數量比較多,Tcp就會通過算法将多個包合并為一個大的資料包發送給接受端,這樣産生的問題就是接收端無法識别出完整的包,由此産生的問題就是沾包拆包

Netty深入淺出學習1.Netty 介紹和應用場景2. i/o模型3. nio詳解4.Netty

如上圖所示,Client端向Server端發送D1 D2兩個資料包

Server端讀取的時候,可能會産生四種情況

1.分兩次讀取 分别讀取到了D1 D2兩個資料包,不存在沾包拆包現象

2.一次讀取,讀到了D1D2兩個資料包合在一起的包,出現沾包現象

3.分兩次讀取,第一次讀取到了D1和D2的一部分資料,第二次讀取到了D2的剩餘部分資料,出現了拆包現象

4.分兩次讀取,第一次讀取到了D1的一部分資料,第二次讀取到 了D1的剩餘部分資料和D2的所有資料,出現拆包現象

4.7.2 解決方案

思路:控制接收端讀取内容的長度來解決問題

方案: 通過自定義解析+編解碼器來解決拆包沾包問題