天天看点

IO, WebSocket ,netty实现客户端,服务端案例

Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

在IO通信中,BIO一般是由一个独立的accepter负责监听客户端的链接,它接受到客户端的链接之后,对每一个客户端都创建一个新的线程进行链路处理,处理完成之后通过输出流返回给应答给客户端。此时线程销毁,这就是典型的一请求一应答。但是缺点是当客户端并发访问量增加以后,服务端的线程个数和客户端的并发访问量成1:1正比,线程是java虚拟机很宝贵的资源,当线程膨胀之后,服务器性能急剧下降,随着并发访问量的继续增加,系统将会发生线程堆栈溢出,创建新线程溢出的情况,最后会导致机器宕机,不能对外提供服务。

伪异步IO通信,当有新的客户端通信是将客户端的socket封装投递到后端的线程池中进行处理,jdk它的线程池维护消息队列和N个活跃的线程,对消息队列中的任务进行相关的处理,当有M个客户端请求的时候,服务端就会创建具有N个线程的线程池进行对客户端的回应,由于线程池可以设置消息队列的大小和最大线程数,所以它的资源占用是可控的,无论多少个客户端访问都不会导致服务器的宕机和资源耗尽。但是缺点是当有大量的客户端接入的时候,随着并发访问量不断的增加,伪异步IO通信会导致线程的阻塞。

NIO核心组成:

1 、Buffer

2、Channel

3、Selector

缓冲区buffer它是一个对象,它包含一些要写入或者读出的数据,在NIO类库中加入buffer对象体现了新库和原IO的一个区别,在面向流的IO中可以将数据直接写入或者直接读出,在NIO中所有的数据都是通过NIO处理的,任何时候访问NIO都是通过缓冲区操作。

在NIO把它支持的I/O对象抽象为Channel,Channel又称“通道”,类似于原I/O中的流(Stream),但有所区别:

1、流是单向的,通道是双向的,可读可写。

2、流读写是阻塞的,通道可以异步读写。

3、流中的数据可以选择性的先读到缓存中,通道的数据总是要先读到一个缓存中,或从缓存中写入。

在NIO中多路复用器Selector,Selector会不断的轮询Channel,如果某个Channel上发生读或者写的操作,这个Channel就会处于就绪状态,会被Selector轮询出来,然后通过SelectorsKey可以获得就绪Channel的集合进行后续的IO操作,而jdk使用了其他方法代替了Selector,所以并没有最大链接数的限制,可以接入更多客户端。

AIO通信,它是链接注册读写事件和回调函数,读写方法异步,同时它是主动通知程序的,AIO通过二种方式获取结果,第一种就是通过filter来获取结果,第二种就是通过header的实现类来完成操作的回调,AIO是真正的非阻塞异步IO,它不需要多路复用器Selector对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO。

什么是webSocket?

是H5提出的一个协议规范,webSocket通过一个握手机制客户端和服务器就能创建一个tcp,从而方便客户端和服务器端的通信, 在webSocket出现之前,一般都是使用HTTP长链接或者短连接。

webSocket是为了解决客户端和服务器端实时通信二产生的技术,webSocket本质上是一个基于tcp协议,先通过一个HTTP或HTTPS发起一条特殊的请求,进行握手后创建一个交换数据的tcp链接,此后呢,客户端和服务端通过这个tcp协议进行实时的通信。

注意,当webSocket的客户端和服务器端进行通信以后,此时就不需要之前进行握手请求的HTTP协议参与。

webSocker的优点:

1、节省通信开销

以前的webServer实现推送和及时通信技术都是使用轮寻,在特定的时间间隔,比如一秒钟,由流量器主动发起请求,将服务器的消息主动啦回来,在这种情况下,我们需要不断的向服务器发起请求,而HTTP request header一般都是很长的,里面包含的数据可能只是一个很小的值,这样会占用很多的服务器资源。

2、服务器主动传送数据给客户端,客户端也可以随时发送数据给服务器,客户端交换的header信息也是很小的。

3、实时通信,webSocker不只只仅限于ajax请求,而webSocker可以彼此发送信息,从而实时通信。

netty实现WebSocket通信的案例:

以下是server和client的示例代码,其中使用的是 Netty 4.x,先看看如何实现,后续会针对各个模块进行深入分析。

server 代码实现

public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(port).sync(); // (5)

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new EchoServer(port).run();
    }
}           

EchoServerHandler 实现

public class EchoServerHandler extends ChannelInboundHandlerAdapter {  
  
    private static final Logger logger = Logger.getLogger(  
            EchoServerHandler.class.getName());  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}           

1、NioEventLoopGroup 是用来处理I/O操作的线程池,Netty对 EventLoopGroup 接口针对不同的传输协议提供了不同的实现。在本例子中,需要实例化两个NioEventLoopGroup,通常第一个称为“boss”,用来accept客户端连接,另一个称为“worker”,处理客户端数据的读写操作。

2、ServerBootstrap 是启动服务的辅助类,有关socket的参数可以通过ServerBootstrap进行设置。

3、这里指定NioServerSocketChannel类初始化channel用来接受客户端请求。

4、通常会为新SocketChannel通过添加一些handler,来设置ChannelPipeline。ChannelInitializer 是一个特殊的handler,其中initChannel方法可以为SocketChannel 的pipeline添加指定handler。

5、通过绑定端口(自定义),就可以对外提供服务了。

client 代码实现

public class EchoClient {  
  
    private final String host;  
    private final int port;  
    private final int firstMessageSize;  
  
    public EchoClient(String host, int port, int firstMessageSize) {  
        this.host = host;  
        this.port = port;  
        this.firstMessageSize = firstMessageSize;  
    }  
  
    public void run() throws Exception {  
        // Configure the client.  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap b = new Bootstrap();  
            b.group(group)  
             .channel(NioSocketChannel.class)  
             .option(ChannelOption.TCP_NODELAY, true)  
             .handler(new ChannelInitializer<SocketChannel>() {  
                 @Override  
                 public void initChannel(SocketChannel ch) throws Exception {  
                     ch.pipeline().addLast(  
                             //new LoggingHandler(LogLevel.INFO),  
                             new EchoClientHandler(firstMessageSize));  
                 }  
             });  
  
            // Start the client.  
            ChannelFuture f = b.connect(host, port).sync();  
  
            // Wait until the connection is closed.  
            f.channel().closeFuture().sync();  
        } finally {  
            // Shut down the event loop to terminate all threads.  
            group.shutdownGracefully();  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        final String host = args[0];  
        final int port = Integer.parseInt(args[1]);  
        final int firstMessageSize;  
        if (args.length == 3) {  
            firstMessageSize = Integer.parseInt(args[2]);  
        } else {  
            firstMessageSize = 256;  
        }  
  
        new EchoClient(host, port, firstMessageSize).run();  
    }  
}           

EchoClientHandler 实现

public class EchoClientHandler extends ChannelInboundHandlerAdapter {  
  
    private static final Logger logger = Logger.getLogger(  
            EchoClientHandler.class.getName());  
  
    private final ByteBuf firstMessage;  
  
    /** 
     * Creates a client-side handler. 
     */  
    public EchoClientHandler(int firstMessageSize) {  
        if (firstMessageSize <= 0) {  
            throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);  
        }  
        firstMessage = Unpooled.buffer(firstMessageSize);  
        for (int i = 0; i < firstMessage.capacity(); i ++) {  
            firstMessage.writeByte((byte) i);  
        }  
    }  
  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) {  
        ctx.writeAndFlush(firstMessage);  
    }  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
       ctx.flush();  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}