天天看点

Netty框架源码分析

官网

官网介绍:
Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty是一个NIO客户端服务器框架,支持快速、简单地开发协议服务器和客户端等网络应用程序。它大大简化和
流线网络编程,如TCP和UDP套接字服务器。“快速而简单”并不意味着最终的应用程序会出现可维护性或性能问
题。Netty经过精心设计,积累了许多协议(如FTP、SMTP、HTTP)的实现经验,以及各种二进制和基于文本的遗留
协议。因此,Netty成功地找到了一种方法,在不妥协的情况下实现开发、性能、稳定性和灵活性。
           

(学习准备) socket(原理)

//客户端      
 Socket socket=new Socket("localhost",8005);
        OutputStream outputStream = socket.getOutputStream();
        OutputStreamWriter outputStreamWriter=new OutputStreamWriter(outputStream);
        outputStreamWriter.write("客户端请求了");
        outputStreamWriter.flush();//注意这里必须刷新
        InputStream inputStream = socket.getInputStream();
        byte bs[]=new byte[1024];
        int read = inputStream.read(bs);
        System.out.println("我是客户端,服务器说"+new String(bs,0,read));

//服务器端
  ServerSocket socket=new ServerSocket(8005);
        System.out.println("等你上线");
        byte bs[]=new byte[1024];
            Socket accept = socket.accept();
            System.out.println("建立了连接");
            InputStream inputStream = accept.getInputStream();
            int read = inputStream.read(bs);
            System.out.println("我是服务器,客户端说"+new String(bs,0,read));
            accept.shutdownInput();//关闭输入流
        OutputStream outputStream = accept.getOutputStream();
        PrintWriter pw = new PrintWriter(outputStream);
        pw.println("欢迎你");
        pw.flush();

-------------------------------------------------------------
网络上的两个程序通过一个双向的通信连接实现数据的交换,这个连接的一端称为一个socket。
建立网络通信连接至少要一对端口号(socket)。socket本质是编程接口(API),对TCP/IP的封装,TCP/IP也要提
供可供程序员做网络开发所用的接口,这就是Socket编程接口;
           
Netty框架源码分析
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门
面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去
组织数据,以符合指定的协议。
           

websocket链接

起因:
HTTP 协议是一种无状态的、无连接的、单向的应用层协议。它采用了请求/响应模型。通信请求只能由客户端发
起,服务端对请求做出应答处理。
这种通信模型有一个弊端:HTTP 协议无法实现服务器主动向客户端发起消息。

这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。大多数 Web 应用程序将
通过频繁的异步 JavaScript 和 XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源(因为必须不停连
接,或者 HTTP 连接始终打开)。

结果:
因此,工程师们一直在思考,有没有更好的方法。WebSocket 就是这样发明的。WebSocket 连接允许客户端和服
务器之间进行全双工通信,以便任一方都可以通过建立的连接将数据推送到另一端。WebSocket 只需要建立一次
连接,就可以一直保持连接状态。这相比于轮询方式的不停建立连接显然效率要大大提高。

应用:微信扫码支付的结果由服务器主动推送到客户端
           

gradle的简单学习介绍:

gradle的安装(windows) 官网下载地址

1.运行 gradle 必须将 GRADLE_HOME/bin 加入到你的 PATH 环境变量中: %GRADLE_HOME%\bin

2.gradle -v

idea 创建一个gradle 项目

IntelliJ IDEA创建Gradle项目

gradle 编程语言:

1.可以省略括号

2.定义变量
//输出值sout(快捷键)
println "123";
//定义变量可以省略分号
def i=12
println i
def s="小明"
println s
//定义集合
def list=['a','b']
//添加集合
list<<'c';
//取出集合
println list.get(2);
//定义一个map
def map=['key1':'value1',"key2":'value2']
//向map中添加键值对
map.key3='value3'
//打印key3
println map.get("key3")//单引号双引号都行
           
闭包:
//闭包就是一段代码块
def b1={
    println "hello b1";

}
//定义一个方法
def method(Closure closure){
    closure()//执行闭包的代码
}
method(b1)//调用方法

def b2={
    v-> 
        println "hell v${v}";//带参数的闭包
}

def method2(Closure closure){
    closure("超神")//传入参数
}
method2(b2)
           
build.gradle

依赖(group,name,version)默认使用中央仓库
compile group: 'org.springframework', name: 'spring-context', version: '5.1.5.RELEASE'
//netty 依赖
compile group: 'io.netty', name: 'netty-all', version: '4.1.34.Final'
  mavenLocal()//配置本地仓库
           

netty

demo:三步骤
1.启动
2.initializer
3.handler
           
http协议 :HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理
需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息
时它的应答就较快。
WebSocket是HTML5出的东西(协议)
长连接,基于http协议
           
client sub(桩)
server skeleton(骨架)
rpc(romote process call)跨机器,跨语言
           

NIO

Buffer 缓冲区 存储基本数据的容器
capacity(容量) limit(下一个元素不该被读写的索引),position(下一个的元素被读写的索引)
 public final Buffer flip() {//反转设置
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

Channel 通道 双向

inChannel.read(buf)!=-1 //read方法 :从这个通道的当前文件位置开始读取字节
outchannel.write(buffer);//write方法 :从给定缓冲区将字节序列写入此通道

   public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;//标记失效
        return this;
    }


  MappedByteBuffer map =
  new FileOutputStream("").getChannel().map(FileChannel.MapMode.READ_WRITE,0,1024);
//这个是直接操作内存映射的buffer
demo:
  FileChannel channel = new FileInputStream("a.txt").getChannel();
            
  MappedByteBuffer byteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
      
  FileChannel channel1 = new FileOutputStream("c.txt").getChannel();
  channel1.write(byteBuffer);
           
while(channel.read(buffer)!=-1){//一直读取字节到缓冲区 例如1024 直到读取完结
    buffer.flip();
    channel1.write(buffer);
    buffer.clear();//必需(回到初始状态,留给下次读取字节数到buffer)
}
分析:当文件读取完了,filp()后p=0,l=1024; write()后,p=l;没有clear()方法时候,无法读取字节(字节
数为0)不等于-1,出现死循环。

注意:第一次不要flip(),p=0, 以后buffer中读取写入需要flip()

           
Netty框架源码分析
ByteBuffer allocate = ByteBuffer.allocate(1024);
        ByteBuffer byteBuffer = allocate.asReadOnlyBuffer();
        System.out.println(byteBuffer.getClass());//类型class java.nio.HeapByteBufferR
//任何buffer都能转为只读buffer,但是反之则不行

ByteBuffer.allocateDirect(10)//分配堆外内存
directorbuffer本身是一个java对象,这个对象有个long类型的address,它里面的数据是在java堆外--native 
memory中,调用malloc()分配出来的内存。在操作系统中的本地内存
           
Selector
 public static Selector open() throws IOException {//打开一个方法
        return SelectorProvider.provider().openSelector();
    }
           

ServerSocketChannel  

//A selectable channel for stream-oriented listening sockets.

//A server-socket channel is created by invoking the {@link #open() open}
A newly-created server-socket channel is
  open but not yet bound.  An attempt to invoke the {@link #accept() accept}
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();

//bind(SocketAddress local)方法 binds the channel's socket to a local address and 
configures the socket to listen for connections.

InetSocketAddress extends SocketAddress 

 serverSocketChannel.configureBlocking(false);//切换非阻塞模式默认true
 A selector may be created by invoking the {@link #open open} method of
 this class 

 Selector selector = Selector.open();
           

Selector(解释)

这里出来一个新概念,selector,具体是一个什么样的东西?

想想一个场景:在一个养鸡场,有这么一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来,如果鸡场的负责人想知道情况,只需要询问那个人即可。

在这里,这个人就相当Selector,每个鸡笼相当于一个SocketChannel,每个线程通过一个Selector可以管理多个SocketChannel。

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//Registers this channel with the given selector, returning a selection key.
然后一直监听,获得key的操作
while(true){
// Iterator<SelectionKey> it = selector.selectedKeys().iterator();
对于selectedKeys进行判断,如果
boolean isReadable() 检测 Channal 中读事件是否就绪
 boolean isWritable() 检测 Channal 中写事件是否就绪 
boolean isConnectable() 检测 Channel 中连接是否就绪
 boolean isAcceptable() 检测 Channel 中接收是否就绪


}

// 取消选择键 SelectionKey
				it.remove();

Selector 是非阻塞 IO 的核心。

           

java精度转化

//java由低字节向高字节自动转换,你想转化错误,强转可能丢失精度
//byte->short->int->long->float->double
        double b=12.12;
        int a=b;//错误 
        int c=12;
        double d=c;//正确
           
1整型
byte              1字节                       -128~127
short             2字节                        -32768~32767
int               4字节                  (-2的31次方到2的31次方-1)
long              8字节                     (-2的63次方到2的63次方-1)
 
2.浮点型
                                             备注
float              4字节                      float类型的数值有一个后缀F(例如:3.14F)
double             8字节                    没有后缀F的浮点数值(如3.14)默认为double类型
   
char              2字节         
boolean           1字节                     false、true
 
           
零拷贝:
零拷贝描述的是CPU不执行拷贝数据从一个存储区域到另一个存储区域的任务,这通常用于通过网络传输一个文件
时以减少CPU周期和内存带宽。
           

Netty分析

EventLoopGroup boss=new NioEventLoopGroup();//接收客户端的连接,转发给work处理,死循环
 EventLoopGroup work=new NioEventLoopGroup();//完成处理

public interface EventLoopGroup extends EventExecutorGroup {}
new NioEventLoopGroup()//主要是完成属性的赋值

 ServerBootstrap serverBootstrap=new ServerBootstrap();//服务器的端的引导类
 serverBootstrap.group(boss,work).channel(NioServerSocketChannel.class).
                    childHandler(new Myinitionlizer());

group()方法就是为两个EventLoopGroup赋值
channel(Class<? extends C> channelClass)方法 Class为了创建一个Channel对象
NioServerSocketChannel继承了ServerSocketChannel,基于 NIO选择器的实现来接受新连接

childHandler()//设置一个childHandler,被用来服务于通道的请求服务于work
Handler服务于boss

ChannelInitializer//一个特殊的通道入站处理程序,提供一种方式去初始化一个通道,一旦它被注册到它的EventLoop中。


           

 //ChannelFuture 

ChannelFuture 继承了Future<V> 继续继承了java.util.concurrent.Future
//Future(jdk 1.5)
一个Future代表了异步操作的结果,计算结果只能通过get()方法去获得
demo:开一个线程去执行搜索操作,但是这个操作比较耗时间,我们不用等待结果完成,就(注意)去执行其它的代码。
最后我们可以通过get()方法去获得执行的结果

 interface ArchiveSearcher { String search(String target); }
  class App {
    ExecutorService executor = ...
                   ArchiveSearcher searcher = ...
                   void showSearch(final String target)
        throws InterruptedException {
      Future<String> future
        = executor.submit(new Callable<String>() {
          public String call() {
              return searcher.search(target);
          }});
      displayOtherThings(); // do other things while searching
      try {
        displayText(future.get()); // use future
      } catch (ExecutionException ex) { cleanup(); return; }
    }
  }}
           

//FutureTask

FutureTask 既实现了Future,也实现了Runnable,主要用来包装Runnable和Callable
可以将FutureTask}提交Executor执行。
public interface Runnable {
    public abstract void run();
}
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
//内存一致性的影响:异步采取的操作一定实在另一个线程执行相应的Future.get()方法之前
           
netty Future
     /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     */
    Future<V> sync() throws InterruptedException;

ChannelFuture
           
Netty的reactor模式:bossgroup 负责与客户端的连接,bossgroup 监听OP_ACCEPT事件,然后通过selector
得到selctorkey的集合,然后通过accept()得到SocketChannel,然后在注册workgroup上面,workgroup上的
选择器监听op_read,以后客户端都和workgroup进行连接和数据的传递。
           
线程就是资源,我们知道服务器端的线程数量是有限的,线程的创建,就是消耗资源
服务器和客户端传递数据就是通过socket进行的
           

 Reactor模式结构

Netty框架源码分析

详解

bind()方法 
//initAndRegister()//初始化和注册

channel = channelFactory.newChannel();//channel的创建
init(channel);//channel的初始化

ChannelPipeline//channel管道,里面是一个个的过滤处理器handler,接收系统设置的hanlder,比如loghandler
用ServerBootstrapAcceptor的对象,并且等待后续客户端的连接
 p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, 
currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

config().group().register(channel);//注册 --注册 Channel到这个 EventLoop中
 public final ServerBootstrapConfig config() {
        return config;
    }
 public final EventLoopGroup group() {
        return bootstrap.group();
    }
//AbstractNioChannel.java
 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
底层是将channel 注册到slector 上,利用nio的实现原理

Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
sync()方法就是确保初始化,注册已经完成。之后返回结果

 channelFuture.channel().closeFuture().sync();//服务端关闭,确保操作完成,才能关闭,所以调用sync方法
---------------------------------------------------------------
         boss.shutdownGracefully();
        worker.shutdownGracefully();//关闭两个线程组,释放资源----优雅关闭
           
ChannelOption allows to configure a ChannelConfig in a type-safe way.
           
ChannelConfig  A set of configuration properties of a Channel.
           
AttributeKey  key用于AttributeKeyMap访问Attribute
           

 Attribute   An attribute which allows to store a value reference. It may be updated atomically and so is thread-safe.

Channel 连接到具有I/O功能的网络套接字或组件的连接操作,例如读、写、连接和绑定。
           

 ChannelHandler 处理或者拦截Channel的入站和出站操作

ChannelPipeline 实现了高级的拦截过滤器模式,让用户完全控制如何处理事件,以及管道中的 ChannelHandler如何相互交互。
ChannelInitializer 一个特殊的 ChannelInboundHandler,它提供了一个简单的方法来初始化通道,一旦它
注册到了它的EventLoop中。
ChannelHandlerContext 是channelhanler和channlepipeline进行交互的桥梁
addLast()
           
一个eventloopgroup 包含多个eventloop,eventloop与channle多对一,一个eventloop在生命周期内只会与
一个线程绑定。
           
注意:在handler 方法中不要指向耗时的任务

解决办法:

1.使用自己定义的业务线程池。

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(Runnable);

2.默认情况下,channel Handler中的回调方法都是由i/o线程执行,如果调用如下方法:
 ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
那么ChannelHandler中的回调方法都是由参数中group线程组来执行
           
SimpleChannelInboundHandler //处理特定类型的消息--根据所给的泛型
           
ChannelInboundHandlerAdapter需要进行转换
           
ctx.channel().writeAndFlush(msg)//消息从channelpipeline末尾进行流动,写道channel()中,
ctx.writeAndFlush(msg)从channelpipeline中的下一个channelhanlder开始流动。可以提高新能
           
ByteBuf byteBuf= Unpooled.buffer(10);//推荐使用Unpooled 创建buffer
        byteBuf.writeByte(11);
        byteBuf.writeByte(111);
        System.out.println(byteBuf.readByte());//11
        System.out.println(byteBuf.readByte());//111 

 while (buffer.maxWritableBytes() >= 4) {//一个整数四个字节,如果>4这个buffer空间就可以写入字节
     buffer.writeInt(random.nextInt());
  }
ByteBuf 里面支持两个索引,一个读索引,和一个写索引:根据索引来访问Byte时候,不会改变真实的读锁引和写索引。
           

对比

nio bytebuffer   final byte[] hb;    决定好空间就不能改变,并且只有一个position的指针来标识信
息,使用的时候需要调用flip()方法或是rewind()方法。

netty bytebuf 存储字节数据组是动态的,写字节的时候,如果空间不足,自动扩容。writeBytes()方法

           

引用计数

如果一个对象实现了引用计数,如果应用计数为0,这个对象就不能被访问了,被回收
初始化为1
retain()加1
release()减一
retain for(;;) 死循环 --自旋锁的实现(自己去不断的循环) 一定要加1实现 ,然后break 退出循环;

AtomicIntegerFieldUpdater//jdk1.5 全局的,netty使用的只需要创建一次。调用newUpdater()
           

处理器

ChannelInboundHandler  //入站处理器
ChannelOutboundHandler //出站处理器
数据处理是的编解码器本之上都是处理器
编解码器:无论我们在网络中写入什么类型,数据在网络中传输的都是以字节流形式体现的
数据转换字节流就是编码encode----将字节转换为原本的形式就是解码decode


编码encode是将数据转换为网络中传输的字节流,是从我们的程序传输到网络中,是一种出站处理器
解码decode将网络中字节转换为我们程序所需要的数据,本质上是一种入站处理器
过程:客户端--网络--服务端

ByteToMessageDecoder 最顶层的解码器 extends ChannelDuplexHandler:自定义解码器可以继承
decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)方法
三个参数的含义
     * @param ctx           the {@link ChannelHandlerContext} which this {@link 
                            ByteToMessageDecoder} belongs to
     * @param in            the {@link ByteBuf} from which to read data
     * @param out           the {@link List} to which decoded messages should be added

LineBasedFrameDecoder  Both {@code "\n"} and {@code "\r\n"} are handled.
FixedLengthFrameDecoder//固定长度
DelimiterBasedFrameDecoder 分割符解码器

LengthFieldBasedFrameDecoder 源码分析

 ReplayingDecoder<S> extends ByteToMessageDecoder {

int len=bytebuf.readint()//读取一个整数类型 len=4----字节的长度为4
}


public class HanlerToMyByte extends MessageToByteEncoder<Long> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        out.writeLong(msg);//使用long型(写入)转化为ByteBuf 形式
    }
}

     * @param ctx              the {@link ChannelHandlerContext} which this {@link M 
                             essageToByteEncoder} belongs to
     * @param msg            the message to encode
     * @param out            the {@link ByteBuf} into which the encoded message will be 
                             written
  
  encode(ChannelHandlerContext ctx, I msg, ByteBuf out) 

客户端重载方法channelInactive(){
ctx.writeandflush()//像网络中发出消息---编码器encoder被触发,然后服务端接收消息解码,解码器被
触发,然后服务端向客户端发消息,编码器被触发,服务端接收消息,解码器被处罚。
}
//注意:bytebuf是netty在网络中传输的数据类型,也是我们消息在网络中转换的类型
           

netty粘包问题:向客户端连续发送十条数据(for实现),客户端只会读取一次将数据全部读出。

拆包:自定义协议(就是定义数据的格式),告诉服务端传递的数据的长度,和实际数据

继续阅读