1.Netty 介紹和應用場景
1.1 介紹
- Netty 是jboss的一個開源架構
- Netty是一個異步的,基于事件驅動的網絡應用架構
- 基于nio
1.2 應用場景
- Rpc 例如dubbo
- 遊戲
- 大資料
涉及到網絡通信的應用都可以使用netty
2. i/o模型
2.1 介紹
- bio 同步并阻塞 一個連接配接對應伺服器一個線程 适用于連接配接數較少的架構 jdk1.4
- nio 同步非阻塞 伺服器一個線程處理多個連接配接 适用于連接配接數較多連接配接時間短 jdk1.4
- aio(nio.2) 異步非阻塞 适用于連接配接數多且連接配接時間長 jdk1.7
2.2 bio
blocking i/o
2.2.1 簡單demo
開發一個服務端,建立一個線程池,當用戶端發送一個請求,服務端對應建立一個線程處理,當有多個用戶端請求時,就會建立多個線程對應處理
這裡demo的用戶端用telnet模拟
public static void handler(Socket socket){
try(InputStream in = socket.getInputStream();){
System.out.println("線程資訊: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
while (true){
int read = in.read(bytes);
if(read!=-1){
System.out.println("輸出資訊: "+new String(bytes,"UTF-8"));
}else {
break;
}
}
}catch (IOException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
try(ServerSocket serverSocket = new ServerSocket(6666);) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("等待連結");
final Socket socket = serverSocket.accept();
System.out.println("連結到一個用戶端");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("線程資訊: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
handler(socket);
}
});
} catch (IOException e) {
2.3 nio
non-blocking i/o 非阻塞
2.3.1 簡介
三大核心
- channel 通道
- buffer 緩沖區
- selector 選擇器
簡述操作原理: selector 選擇可用的channel, channel 與 buffer可以互相讀寫,應用程式并不直接對channel進行操作,而是通過對buffer進行操作,間接操作channel
一個線程中會有多個selector,一個selector中可以注冊多個channel,如果并沒有資料傳輸,線程還可以做其他事,并不會一直等待
2.4 nio與bio 的差別
- nio非阻塞 bio阻塞
- nio用塊的方式處理io bio用流的方式處理io 塊的方式比流的方式要快
- bio基于位元組流/字元流 nio基于緩沖區和通道(channel) selector監聽多個通道的事件,是以用一個線程就可以處理多個通道的資料
圖示: nio
bio
3. nio詳解
3.1 nio模型三大元件的關系
- 一個線程對應一個selector
- 一個selecor對應多個channel
- 一個channel對應一個buffer
- 一個線程對應多個channel
- channel與buffer都是雙向的,就是既可以讀也可以寫 使用flip()方法切換
- buffer就是一個記憶體塊,讀寫記憶體比較快
- selector會根據不同僚件切換不同的channel
3.2 Buffer緩沖區
3.2.1 簡介
本質是一個讀寫資料的記憶體塊,可以了解成一個提供了操作記憶體塊方法的容器對象(數組)
緩沖區中内置了一些機制,這些機制可以檢測到緩沖區的資料變化,狀态變化
channel讀寫的資料必須都經過Buffer
3.2.2 源碼分析
常用的幾個操作方法
public static void main(String[] args) {
//allocate 規定intbuffer的長度
IntBuffer buffer = IntBuffer.allocate(5);
//capacity()擷取容量
//put()寫入
for(int i = 0;i<buffer.capacity();i++){
buffer.put(i*2);
//flip()反轉 由寫轉為讀
buffer.flip();
//讀取
//get()每次讀取後 索引向後移動一位
System.out.println(buffer.get());
3.2.2.1 定義
IntBuffer中定義了一個int數組,其他類型的buffer類似
public abstract class IntBuffer
extends Buffer
implements Comparable<IntBuffer>
{
// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final int[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
最頂層的Buffer類中定義了四個屬性
public abstract class Buffer {
/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; //标記
private int position = 0; //目前索引的位置,不能超過limit
private int limit;//最大能讀寫的長度
private int capacity;//容量 allocate定義的長度
3.2.2.2 反轉
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
可以看到,反轉之後,由讀變為寫,或者由寫變為讀
将索引歸0,最大讀寫長度不能超過上次操作的索引
3.2 channel 通道
- 通道類似于流/連接配接,但是流隻能寫入或者讀取,通道可以即讀取也寫入
- 通道異步讀寫資料
- 通道可以讀寫資料到緩存區
3.2.2 層級關系
當有用戶端發送請求時,服務端會建立一個ServerSocketChannel(實作類:ServerSocketChannelImpl) 再由ServerSocketChannel建立一個SocketChannel(實作類:SocketChannelImpl即真正讀寫資料的通道),這個SocketChannel就是與這個用戶端請求所對應的
3.2.3 案例剖析
3.2.3.1 FileChannle 輸出檔案流
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author: zhangyao
* @create:2020-08-25 14:50
**/
public class FileChannelTest {
FileOutputStream fileOutputStream = null;
try {
//檔案輸出流
fileOutputStream = new FileOutputStream("D:\\file01.txt");
//檔案輸出流包裝為FileChannel 此處FileChannel預設實作FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//建立對應的緩沖區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//資料寫入緩沖區
byteBuffer.put("hello nio".getBytes());
//反轉,因為接下來需要從緩沖區讀取資料寫入Channel
byteBuffer.flip();
//從緩沖區寫入Channel
fileChannel.write(byteBuffer);
} catch (FileNotFoundException e) {
} finally {
//關閉檔案流
if(fileOutputStream!=null){
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
整體流程就是把資料寫入緩沖區,在讀取緩存區寫入通道Channel,在由檔案輸出流輸出
圖示如下
3.2.3.2 FileChanle 輸入檔案流
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream("D:\\file01.txt");
//擷取Channel
FileChannel channel = fileInputStream.getChannel();
//建立byteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//從channel中讀取資料寫入buffer
channel.read(byteBuffer);
//反轉 下一步需要從buffer中讀取資料輸出
byteBuffer.flip();
//輸出
byte[] array = byteBuffer.array();
System.out.println(new String(array));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
} finally {
fileInputStream.close();
與上面的例子剛好相反,從檔案中讀取資料,通過通道寫入buffer緩沖區,在輸出
圖示
3.2.3.3 FileChannel 拷貝檔案
其實就是上面兩個例子結合,把一個檔案中的資料複制到另外一個檔案中
FileOutputStream fileOutputStream = null;
fileOutputStream = new FileOutputStream("D:\\file02.txt");
FileChannel channel1 = fileOutputStream.getChannel();
while (true){
//将byteBuffer複位
byteBuffer.clear();
int read = channel.read(byteBuffer);
if(read==-1){
break;
channel1.write(byteBuffer);
fileOutputStream.close();
這裡使用了byteBuffer.clear()方法
因為ByteBuffer緩沖區是有長度的,當讀取的檔案超過緩沖區的長度時,如果不對緩沖區進行清空,當進行下一次讀取時,就會從上一次讀取的位置開始讀取,會出現死循環的情況
3.2.3.4 FileChannel 拷貝檔案之TransferFrom
//從channel通道拷貝到 channel1通道
channel1.transferFrom(channel, 0, channel.size());
3.2.4 Buffer的分散和聚集
上面的例子都是使用單個buffer進行資料的讀寫,如果資料過大,也可用使用多個buffer(buffer數組)進行資料的讀寫,即用空間換時間
3.3 Selector選擇器
3.3.1 基本簡介
一個selector管理多個channel通道,使用異步的方式處理io
隻有讀寫真正的發生時,才會處理資料,減小了線程的壓力,不用每個請求都維護一個線程
避免了多線程之間的上下文切換導緻的開銷
3.3.2 selector的api
Selector:
- select() 阻塞
- select(Long timeout) 有逾時時間
- selectNow() 非阻塞
- wakeup() 立即喚醒selector
3.3.2 selecor的工作流程
其實是Selector SelectionKey ServerSocketChannel SorkcetChannel的工作原理
- 當用戶端連結時,通過ServerSockertChannel 得到SocketChannel 并且注冊到 Selector上
-
- 注冊源碼
public abstract SelectionKey register(Selector sel, int ops)
throws ClosedChannelException;
這是SocketChannel注冊到Selector上的方法,第一個參數為要注冊的Selector對象,第二個參數為事件驅動的類型
public abstract class SelectionKey {
public static final int OP_READ = 1;
public static final int OP_WRITE = 4;
public static final int OP_CONNECT = 8;
public static final int OP_ACCEPT = 16;
private volatile Object attachment = null;
- 當注冊完成後傳回一個Selectionkey,這個selectionKey會和SocketChannel關聯
- Selector通過select方法監聽Channel,如果有事件發生,傳回對應的selectionKey集合
-
- 源碼
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
public int select() throws IOException {
return this.select(0L);
public int selectNow() throws IOException {
return this.lockAndDoSelect(0L);
private int lockAndDoSelect(long var1) throws IOException {
synchronized(this) {
if (!this.isOpen()) {
throw new ClosedSelectorException();
} else {
int var10000;
synchronized(this.publicKeys) {
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
return var10000;
- 通過得到的selectionKey可以反向擷取Channel
public abstract SelectableChannel channel();
- 最後通過channel處理業務
3.3.3 案例
服務端思路:
- 建立serverSocketChannel綁定端口6666,把這個channel注冊到Selector上,注冊事件是OP_ACCEPT
- 循環監聽,判斷是否channel中是否有事件發生,如果有事件發生,判斷不同的事件類型進行不同的連結,讀/寫操作
用戶端思路
- 建立一個SocketChannel,連接配接上伺服器之後,發送消息,并保持連結不關閉
3.3.3.1 server端
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
* @create:2020-08-26 16:55
public class ServerChannel {
//生成一個ServerScoketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//設定為非阻塞的
serverSocketChannel.configureBlocking(false);
//serverSocket監聽6666端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//建立Selector
Selector selector = Selector.open();
//serverSocketChannel注冊到Selector
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循環等待連結
//如果沒有事件發生,就繼續循環
if(selector.select(1000) == 0){
System.out.println("等待1s,無連接配接");
continue;
//如果有事件驅動,就需要周遊事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//如果事件是連接配接
if(key.isAcceptable()){
try {
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
SelectionKey register = channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("連結成功");
} catch (IOException e) {
e.printStackTrace();
}
//如果是讀取資料
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
byte[] array = byteBuffer.array();
System.out.println("讀取資料:"+ new String(byteBuffer.array()));
iterator.remove();
};
3.3.3.2 用戶端
import java.nio.channels.SocketChannel;
* @create:2020-08-26 17:24
public class ClientChannel {
//建立一個SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
if(!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("伺服器連接配接中,線程并不阻塞,可以進行其他操作");
//連接配接成功
socketChannel.write(ByteBuffer.wrap("hello ,server".getBytes()));
System.in.read();
4.Netty
4.1 簡介
https://netty.io/官網
netty是對java nio api的封裝,簡化了nio程式的開發,jdk要求最低1.6
流行的網絡程式設計通信架構,Dubbo Elasticsearch 等架構底層的網絡通信架構都是 Netty
架構模型
版本
netty 共有 3.x 4.x 5.x三個大版本
3.x較老,5.x有重大bug,被官網廢棄 現在主要使用4.x
4.2 線程模型
有以下幾種線程模型
4.2.1 傳統I/O阻塞模型
每一個連結都需要一個對應的線程進行處理,并且當連結建立後,如果目前連結沒有資料傳輸時,此線程會被阻塞在read()方法
4.2.2 Reactor模式
原理圖示如上
主要是針對了傳統I/O模型一個連接配接會阻塞一個線程的問題進行了改進,當連接配接建立後都通過ServiceHandler調用線程池中的線程進行處理,這樣就隻用阻塞一個ServiceHandler線程,達到多路複用的目的
Reactor模式有三種實作方式
4.2.2.1單Reactor單線程
使用一個線程通過多路複用完成所有操作,包括讀寫連接配接
redis使用的就是這種模型 單線程
4.2.2.2單Reactor多線程
相對于單Reactor單線程,主線程不在進行業務處理,當有請求過來之後,具體的業務處理交與線程池中的線程處理,線程處理完成後再通過handler傳回給Client
4.2.2.3 主從Reactor多線程
相比于單Reacotr,主從Reactor将Reactor分為MainReactor和SubReactor
MainReactor中負責分發和連接配接
SubReactor中負責讀寫
一個MainReactor可以對應多個SubReactor
4.2.3 Netty模型
簡述Netty模型
- 角色
-
- BossGroup BossGroup的類型是NioEventLoopGroup,其中包含了很多NioEventLoop
- NioEventLoop nio事件循環,每個NioEventLoop中都有一個Selctor和一個任務隊列
- WorkerGroup 類型是NioEventLoopGroup,與BossGroup類似,隻不過功能不同,BossGroup隻負責與用戶端建立連接配接, WorkerGroup需要讀寫,處理業務
- PipeLine 管道,對Channel進行的封裝,具體的業務處理是通過Pipline對Channel進行處理
- 具體流程
-
- 當用戶端發送請求時,首先進入BossGroup,有NioEventLoop對請求進行事件輪詢,如果是連接配接事件就進行處理
-
-
- 處理的步驟分為三步
- 輪詢
- 注冊 這裡的注冊指的是将生成的SocketChannel注冊到workerGroup中的某個NioEventLoop中的Selector上
- 執行任務清單
-
-
- 當請求的事件是讀寫時,就有workerGroup對請求進行具體的業務處理
-
-
- 處理的步驟BossGroup類似
-
-
總結
由此可以看出,Netty的模型與主從Reactor模型類似,都是由一個主Reactor負責連接配接事件,由一個從Reactor負責讀寫事件
4.2.4 案例demo
4.2.4.1 服務端
4.2.4.1.1 NettyServer
package netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
* @create:2020-09-03 08:55
public class NettyServer {
//建立BossGroup 和 WorkerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
ChannelFuture channelFuture = null;
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new NettyServerHandler());
System.out.println("伺服器就緒.....");
//綁定端口
channelFuture = serverBootstrap.bind(6668).sync();
}catch (Exception e){
}finally {
try {
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
4.2.4.1.2 NettyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
* @create:2020-09-03 09:12
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取資料
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("用戶端發送消息:"+ buf.toString(CharsetUtil.UTF_8));
System.out.println("用戶端位址:"+ ctx.channel().remoteAddress());
//資料讀取完畢
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 用戶端",CharsetUtil.UTF_8));
//處理異常 關閉ctx
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
4.2.4.2 用戶端
4.2.4.2.1 NettyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.socket.nio.NioSocketChannel;
* @create:2020-09-03 09:52
public class NettyClient {
EventLoopGroup executors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(executors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
});
System.out.println("用戶端就緒........");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//關閉通道
channelFuture.channel().closeFuture().sync();
executors.shutdownGracefully().sync();
4.2.4.2.2 NettyClientHandler
* @create:2020-09-03 10:00
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//就緒時觸發
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ctx: "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服務端", CharsetUtil.UTF_8));
//讀取資訊
//這裡讀取的是伺服器傳回的資訊
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服務端發送消息: "+ byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服務端位址: "+ ctx.channel().remoteAddress());
//異常處理
一個簡單的TCP服務通信,用戶端發送消息,服務端接收消息并傳回消息給用戶端
4.2.4.3 案例demo源碼分析
4.2.4.3.1 NioEventGroup
public NioEventLoopGroup() {
this(0);
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor)null);
可以看到,如果使用無參的NioEventGroup,預設傳遞的是0,也可以指定線程數
一層一層找下去:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //NettyRuntime.availableProcessors()擷取目前計算機的核數(邏輯處理器)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
發現最後找到NioEventGroup父類的方法
如果指定了NioEventGroup的線程數,且不為0的時候,就使用指定的線程數
否則,**就使用目前計算機的核數2作為線程數
debug 看結果
電腦是12核,預設是24個線程
指定一個線程
就隻有一個線程數
4.2.5 異步模型
上文中的案例Demo中的 ChannelFuture
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
異步模型Future是相對與同步來說的
異步指的是當一個異步調用發出後,不會立刻得到結果,而是通過回調,狀态來通知調用者調用的結果
Netty 中的 connect 和 bind() sync方法就是傳回了一個異步的結果,之後再通過監聽擷取到結果
也就是 Future-Listener機制
當Future對象剛建立的時候,處于未完成的狀态,可以通過傳回的ChannelFuture檢視操作執行的狀态,也可以注冊監聽函數來執行完成後的操作
isSucess()是否成功
isDone()是否完成
isCancelable() 是否取消
cause() 失敗原因
addListener 增加監聽器
//綁定端口
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("監聽端口成功");
}else {
System.out.println("監聽端口失敗");
});
4.2.6 Netty Http服務
做一個簡單的demo 浏覽器(用戶端)通路伺服器端7001端口,傳回一個字元串資訊
浏覽器通路是http請求,伺服器也需要相應一個httpResponse
4.2.6.1 服務端
NettyHttpServer 啟動類
package netty.http;
* @create:2020-09-04 11:16
public class NettyHttpServer {
public static void main(String[] args) throws InterruptedException {
serverBootstrap.group(bossGroup,workerGroup)
.childHandler(new NettyHttpInitialize());
ChannelFuture channelFuture = serverBootstrap.bind(7001).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
NettyHttpInitialize 處理器類
對之前的ChannelInitialize(SocketChannel)進行封裝
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpServerCodec;
* @create:2020-09-04 11:21
public class NettyHttpInitialize extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel sc) throws Exception {
//得到管道
ChannelPipeline pipeline = sc.pipeline();
//管道中加入處理器 主要是處理Http請求的,解析請求頭之類的
pipeline.addLast("myDecoder",new HttpServerCodec());
//加入處理器
pipeline.addLast("myServerHandler",new NettyServerHandler());
NettyServerHandler 具體的處理(傳回http響應)
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import org.springframework.http.HttpStatus;
* @create:2020-09-04 14:01
public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//讀資訊
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
System.out.println(httpObject);
System.out.println("用戶端位址+"+ channelHandlerContext.channel().remoteAddress());
ByteBuf byteBuf = Unpooled.copiedBuffer("hello , im 服務端", CharsetUtil.UTF_8);
//傳回用戶端資訊
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,byteBuf);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(fullHttpResponse);
出現的問題記錄:
-
第一次綁定6668端口,浏覽器通路失敗,換成7001就可以了
原因: 谷歌浏覽器禁用了6665-6669以及其他一些不安全的端口
- 通路後第一次請求 出現異常,可以正常傳回資料
原因: NettyServerHandler中沒有對異常處理方法進行重寫
加上這部分就可以了,報錯資訊也報的很明顯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
4.2.6.2 Http服務的過濾
可以對一些不希望處理的請求進行過濾,其實就是在對http請求的處理過程中判斷一下請求的uri
在上文中 NettyServerHandler類中 加入以下代碼,即可攔截/favicon.ico請求
HttpRequest re = (HttpRequest) httpObject;
String uri = re.uri();
if(uri.equals("/favicon.ico")){
System.out.println("不想處理,傳回");
return;
4.3 Netty API梳理
基于上述的各種demo,對Netty常用的類和方法進行系統梳理
4.3.1 Bootstrap
-
ServerBootstrap 服務端啟動引導類
BootStrap 用戶端啟動引導類
-
- .group() 給BootStrap設定NioEventLoopGroup,可以設定多個
- .channel() 設定服務使用的通道類
- .option() 設定通道參數
- .handler() 對BossGroup進行設定
- .childrenHandler() 對workerGroup 進行設定
- .bind() 服務端綁定一個端口号,監聽端口
- connect() 用戶端用于連接配接服務端
4.3.2 Future
Netty中的io操作都是異步的,也就是不能立刻傳回結果,而是當完成了之後再通知調用者
- Future
-
ChannelFutrue
方法
-
- channel() 傳回目前正在進行IO操作的通道
- sync() 轉為同步,等待異步操作完畢
4.3.3 Channel
不同協定,不同阻塞類型都有對應的Channel
- NioSocketChannel 異步tcp協定Socket連接配接
- NioServerSocketChannel 異步tcp協定服務端連接配接
- NioDatagramChannel 異步udp連接配接
- NioSctpChannel 異步sctp用戶端連接配接
- NioSctpServerChannel 異步sctp服務端連接配接
4.3.4 Selector
Netty 基于Nio Selector對象實作多路複用,一個selector管理多個channel
4.3.5 ChannelHandler
主要是用于對資料的處理,其中有很多封裝好的方法,使用的時候繼承其子類即可
實作類
子類很多,常用的幾個
channelHandler
- ChannelInboundHandler
- ChannelOutboundHandler
- 擴充卡
-
- channelInboundHandlerAdapter
- channelOutboundHandlerAdapter
4.3.6 pipeline
結構圖如上
channel中可以建立出一個ChannelPipeline, ChannelPipeline中有維護了一個由ChannelHandlerContext組成的雙向連結清單
每個ChannelHandlerContext又對應了一個Channelhandler
常用方法:
addFirst(); 添加一個Handler到連結清單中的第一個位置
addLast(); 添加到連結清單的最後一個位置
4.3.7 channelHandlerContext
每一個channelhandlerContext包含了一個channelHandler(業務處理)
channelHandlerContext中還可以擷取到對應的channel和pipeline資訊
channelHandlerContext.channel();
channelHandlerCOntext.pipeline();
4.3.8 EventLoopGroup
netty一般提供兩個EventLoopGroup BossEventLoopGroup 和 workerEventLoopGroup
EventLoopGroup可以指定使用的核心是多少個
4.3.9 Unplooed
Netty提供的用來操作緩沖區資料的工具類
copiedBuffer(); 傳回的是Netty提供的 Bytebuf對象
4.3.10 ByteBuf
Netty的資料容器(緩沖區)
可以直接進行讀/寫 讀寫之間不需要進行flip(),原因是ByteBuf内部維護了兩個索引 readIndex writeIndex
常用方法
getByte()
readByte()
writeByte()
capacity()
4.4 Netty 心跳檢測機制
當用戶端長時間沒有讀/寫操作時,服務端需要檢測用戶端是否還處于連接配接狀态,也就是心跳檢測
Netty提供了心跳檢測的處理類 IdleStateHandler
示例代碼
package netty.hearbeat;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
* @create:2020-09-10 10:04
public class MyServer {
public static void main(String[] args) throws Exception{
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)
.handler(new LoggingHandler())
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
pipeline.addLast("inleHandler",new MyHearBeatHandler());
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
import io.netty.channel.ChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
* @create:2020-09-10 16:50
public class MyHearBeatHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
if(o instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent) o;
IdleState state = event.state();
switch (state){
case READER_IDLE:
System.out.println("讀空閑");
case WRITER_IDLE:
System.out.println("寫空閑");
case ALL_IDLE:
System.out.println("讀寫空閑");
用戶端沒有差別
4.5 Netty 之 webSocket
使用netty編寫webSocket長連接配接
4.5.1 服務端
package netty.websocket;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
* @create:2020-09-11 14:43
//添加http解碼器
pipeline.addLast(new HttpServerCodec());
//添加塊傳輸處理器
pipeline.addLast(new ChunkedWriteHandler());
//http分段傳輸,增加一個聚合處理
pipeline.addLast(new HttpObjectAggregator(8192));
//增加websocket協定處理
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
//增加自定義處理業務處理器
pipeline.addLast(new MyServerHandler());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
4.5.2 服務端自定義的處理器
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;
* @create:2020-09-11 14:49
public class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//當用戶端關閉連接配接
System.out.println("用戶端關閉連接配接..."+ ctx.channel().id());
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//當用戶端連接配接到服務端
System.out.println("有用戶端連接配接到服務端 id為" + ctx.channel().id());
//異常關閉連接配接
//收到消息
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
//讀取消息并傳回相同的消息傳回給用戶端
String text = textWebSocketFrame.text();
System.out.println("服務端收到消息" + text);
//傳回給用戶端
Channel channel = channelHandlerContext.channel();
channel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now()+" 服務端傳回消息:" + text));
4.5.3 頁面(webSocket用戶端)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>測試netty+webSocket</title>
</head>
<body>
<form onsubmit="return false">
<textarea id="sendMessage" style="height: 300px;width: 300px" placeholder="請輸入要發送的消息"></textarea>
<button onclick="send(document.getElementById('sendMessage').value)">發送消息</button>
<textarea id="responseMessage" style="height: 300px;width: 300px" ></textarea>
<button onclick="document.getElementById('responseMessage').value=''">清空消息</button>
</form>
</body>
<script type="application/javascript">
var websocket;
if(!window.WebSocket){
alert("浏覽器不支援webSocket")
}else {
//進行webSocket的開啟 關閉
websocket = new WebSocket("ws://localhost:7000/hello");
//webSocket 開啟事件
//給消息傳回框加入一條資料
websocket.onopen = function (ev) {
document.getElementById("responseMessage").value = '連接配接到服務端';
websocket.onclose = function (ev) {
document.getElementById("responseMessage").value += '\n 連接配接關閉';
//當服務端響應消息時觸發 将服務端傳回的消息回顯緻文本框
websocket.onmessage = function (ev) {
document.getElementById("responseMessage").value += '\n ' ;
document.getElementById("responseMessage").value += ev.data ;
//發送消息
function send (message) {
if(!window.websocket){
alert("socket還未初始化完成");
return;
if(websocket.readyState == WebSocket.OPEN){
websocket.send(message);
document.getElementById('sendMessage').value=''
</script>
</html>
4.6 Netty 編碼解碼
網絡傳輸過程中的編碼解碼過程
codec 編解碼器 包含 encoder編碼器 和 decoder解碼器
netty中提供了一些StringCodec ObjectCodec的編解碼器,但是這些編解碼器還是依賴java底層的序列化技術,java底層的序列化是比較低效的,是以需要引入新的高效的序列化技術
4.6.1 ProtoBuf
4.6.2 自定義編碼解碼器
package netty.inboundAndOutbound;
import netty.inboundAndOutbound.client.MyClientMessageToByteHandler;
* @create:2020-09-18 14:53
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加入棧解碼器
pipeline.addLast(new ByteToMessageHandler());
//添加出棧編碼器
pipeline.addLast(new MyClientMessageToByteHandler());
//添加自定義處理器
pipeline.addLast(new MyServerHandler());
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
* @create:2020-09-18 14:54
public class ByteToMessageHandler extends ByteToMessageDecoder {
//自定義實作的入棧解碼器
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()>=8){
list.add(byteBuf.readLong());
package netty.inboundAndOutbound.client;
* @create:2020-09-18 16:16
public class MyClientByteToLong extends ByteToMessageDecoder {
4.6.3 handler處理機制及出棧入棧
通過上面的編碼解碼進而延申到handler的處理機制
簡單的netty出棧入棧的解釋圖
出棧和入棧是相對于而言的,當用戶端發送消息到達服務端,對于用戶端來說就是出棧,對于服務端來說就是入棧,反之亦然
4.7 Tcp沾包 拆包
4.7.1 沾包拆包介紹
tcp服務再發送消息的時候,如果發送的多個包.資料量小且包的數量比較多,Tcp就會通過算法将多個包合并為一個大的資料包發送給接受端,這樣産生的問題就是接收端無法識别出完整的包,由此産生的問題就是沾包拆包
如上圖所示,Client端向Server端發送D1 D2兩個資料包
Server端讀取的時候,可能會産生四種情況
1.分兩次讀取 分别讀取到了D1 D2兩個資料包,不存在沾包拆包現象
2.一次讀取,讀到了D1D2兩個資料包合在一起的包,出現沾包現象
3.分兩次讀取,第一次讀取到了D1和D2的一部分資料,第二次讀取到了D2的剩餘部分資料,出現了拆包現象
4.分兩次讀取,第一次讀取到了D1的一部分資料,第二次讀取到 了D1的剩餘部分資料和D2的所有資料,出現拆包現象
4.7.2 解決方案
思路:控制接收端讀取内容的長度來解決問題
方案: 通過自定義解析+編解碼器來解決拆包沾包問題