天天看點

SpringBoot內建logback異步日志

作者:小魚科技3

一、 ArrayBlockingQueue 的使用

logback 異步日志列印中 ArrayBlockingQueue 的使用:

1、異步日志列印模型概述

在高并發、高流量 并且 響應時間要求比較小的系統中 同步列印日志 已經滿足不了需求了,因為 列印日志本身是需要寫磁盤的,寫磁盤的操作 會暫時阻塞 調用列印日志的業務線程,這會造成調用線程的 rt 【響應時間】增加。

SpringBoot內建logback異步日志

同步日志列印模型的缺點,是将日志寫入磁盤的操作 是 業務線程 同步調用完成的,那麼 是否可以讓 業務線程把要列印的日志任務放入一個隊列後 直接傳回,然後使用一個線程 專門負責從隊列中擷取日志任務 并将其寫入磁盤 呢? 這樣的話,業務線程列印日志 的 耗時 僅僅是把日志任務放入隊列的耗時了,這就是 logback 提供的 異步日志列印模型。

SpringBoot內建logback異步日志

可以看到,logback 的異步日志列印模型是一個 多生産者、單消費者模型,提供隊列把 同步日志列印 轉換成了異步,業務線程隻需要通過調用異步 appender 把日志任務放入日志隊列,而 日志線程則負責使用同步的 appender 進行具體的日志列印。日志列印線程隻需要負責生産日志 并将其放入隊列,而不需要關心消費線程何時把日志具體寫入磁盤。

2、異步日志與具體實作

(1) 異步日志

先建構 Maven 項目,在 pom.xml 中導入依賴:

SpringBoot內建logback異步日志

在 resources 包下建立 logback 的 xml 配置檔案 logback-test.xml :

SpringBoot內建logback異步日志
SpringBoot內建logback異步日志

(這個 XML 的配置參考部落格 https://blog.csdn.net/qq_23132561/article/details/100924628)

寫 Java 代碼:

SpringBoot內建logback異步日志

@RestController

public class LogController {

private static Logger logger= LoggerFactory.getLogger("LogController");

@GetMapping("/log")

public String testLog(){

logger.info("info");

logger.debug("debug");

logger.warn("warn");

logger.error("error");

return JSONObject.toJSONString(logger);

}

}

運作後可以看到控制台輸出:

SpringBoot內建logback異步日志

目錄中也生成了日志包:

SpringBoot內建logback異步日志

在相應的 warn 的 log 中生成:

SpringBoot內建logback異步日志

以上代碼中,AsyncAppender 是實作異步日志的關鍵。

(2) 異步日志實作原理

類圖

SpringBoot內建logback異步日志

可以看到,AsyncAppender 繼承自 AsyncAppebderBase,AsyncAppebderBase 實作了 異步日志模型的功能,而 AsyncAppender 隻是重寫了其中一些方法。logback 中的 異步日志 是一個阻塞隊列,其實就是 有界阻塞隊列 ArrayBlockingQueue,其中 queueSize 表示有界隊列的

元素個數,預設是 256 個。之是以使用有界隊列,是考慮記憶體溢出問題。在高并發下寫日志的 QPS 會很高,如果設定為無界隊列,隊列本身會占用很大的記憶體,很可能造成 OOM。

worker 是個線程,也就是 異步日志列印模型中的 單消費者 線程。aai 是一個 appender 的裝飾器,裡面存放同步日志的 appender ,其中 appenderCount 記錄 aai 裡附加的 同步 appender 的個數。neverBlock 用來訓示 當日志隊列滿時 是否阻塞列印日志的線程, discardingThreshold 是一個門檻值,當 日志隊列裡的空閑元素個數小于該值時,新來的某些級别的日志會被直接丢棄。

① AsyncAppenderBase 的 start 方法

來看看 AsyncAppenderBase 的 start 方法,何時建立日志隊列,以及 何時啟動消費線程。 該方法在解析完配置 AsyncAppenderBase 的 xml 節點元素後被調用。

源碼:

public void start() {

if (!this.isStarted()) {

if (this.appenderCount == 0) {

this.addError("No attached appenders found.");

} else if (this.queueSize < 1) {

this.addError("Invalid queue size [" + this.queueSize + "]");

} else {

// 日志隊列為有界阻塞隊列

this.blockingQueue = new ArrayBlockingQueue(this.queueSize);

// 如果沒設定門檻值 discardingThreshold ,就設定為隊列大小的 1/5

if (this.discardingThreshold == -1) {

this.discardingThreshold = this.queueSize / 5;

}

this.addInfo("Setting discardingThreshold to " + this.discardingThreshold);

// 設定線程為守護線程

this.worker.setDaemon(true);

// 設定日志名稱

this.worker.setName("AsyncAppender-Worker-" + this.getName());

// 啟動消費線程

super.start();

this.worker.start();

}

}

}

worker 線程被設定為 守護線程 ,這意味着 當主線程運作結束 并且 目前沒有使用者線程時,該 worker 線程會随着 JVM 的退出而終止,而 不管日志隊列裡是否還有日志任務沒有被處理。另外,這裡設定了線程的名稱,這對于查找問題很有幫助,根據線程名字就可以定位線程。

② AsyncAppenderBase 的 append 方法

既然是有界隊列,那麼肯定要考慮隊列滿的問題,是 丢棄老的日志任務,還是 阻塞日志列印線程 直到隊列有空餘元素呢?

看看 具體進行日志列印的 AsyncAppenderBase 的 append 方法,源碼:

protected void append(E eventObject) {

// (一)

if (!this.isQueueBelowDiscardingThreshold() ||

// (二)調用 AsyncAppender 重寫的 isDiscardable 方法

!this.isDiscardable(eventObject)) {

this.preprocess(eventObject);

// 将日志任務放入隊列

this.put(eventObject);

}

}

(一)isQueueBelowDiscardingThreshold:

private boolean isQueueBelowDiscardingThreshold() {

return this.blockingQueue.remainingCapacity() < this.discardingThreshold;

}

(二)isDiscardable:

protected boolean isDiscardable(ILoggingEvent event) {

Level level = event.getLevel();

return level.toInt() <= 20000;

}

可以看到,如果目前日志的級别 小于等于 INFO_INT (20000),并且 目前隊列的剩餘容量 小于 discardingThreshold 則會直接丢棄這些日志任務。

③ AsyncAppenderBase 的 put 方法

private void put(E eventObject) {

if (this.neverBlock) {

// (三)

this.blockingQueue.offer(eventObject);

} else {

// (一)

this.putUninterruptibly(eventObject);

}

}

(一)putUninterruptibly:

private void putUninterruptibly(E eventObject) {

boolean interrupted = false;

try {

while(true) {

try {

// (二)

this.blockingQueue.put(eventObject);

return;

} catch (InterruptedException var7) {

// 記錄中斷标志,并不抛出異常

interrupted = true;

}

}

} finally {

if (interrupted) {

Thread.currentThread().interrupt();

}

}

}

可以看到,如果目前日志列印線程在調用 put 方法時被其他線程中斷,則 隻是記錄中斷标志,然後繼續 循環調用 blockingQueue.put 方法,嘗試把日志任務放入日志隊列。這樣的話,即使目前任務被中斷,日志任務最終也會被放入日志隊列的。

(二)put:

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

// 如果隊列滿,則 調用 await 方法阻塞目前調用線程

notFull.await();

enqueue(e);

} finally {

lock.unlock();

}

}

可以看到,在 put 方法中,如果 neverBlock 被設定為 false(預設是 false),就會調用阻塞隊列的 put 方法,而 put 是阻塞的,也就是說,如果目前隊列滿,則 在調用 put 方法向隊列放入一個元素時,調用線程會被阻塞,直到隊列有空餘空間。而如果 neverBlock 被設定為 true ,則會調用 blockingQueue.offer 方法,而 該方法是非阻塞的,是以 如果目前隊列滿,則會直接傳回,也就是 直接丢棄目前日志任務。

(三)offer:

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock();

try {

if (count == items.length)

return false;

else {

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

④ AsyncAppenderBase 的 addAppender 方法

源碼:

public void addAppender(Appender<E> newAppender) {

if (this.appenderCount == 0) {

++this.appenderCount;

this.addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender.");

this.aai.addAppender(newAppender);

} else {

this.addWarn("One and only one appender may be attached to AsyncAppender.");

this.addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");

}

}

可以看到,一個異步 appender 隻能綁定一個同步 appender,這個 appender 會被放到 AppenderAttachableImpl 的 appendList 清單中。

以上是日志生産線程 把 日志任務 放入日志隊列的實作,接下來看看 消費線程 如何 從隊列中 消費日志任務 并将其寫入磁盤。

⑤ worker 的 run 方法

源碼:

class Worker extends Thread {

Worker() {

}

public void run() {

AsyncAppenderBase<E> parent = AsyncAppenderBase.this;

AppenderAttachableImpl aai = parent.aai;

// 直到線程被中斷才退出循環

while(parent.isStarted()) {

try {

// 從阻塞隊列中擷取元素

E e = parent.blockingQueue.take();

aai.appendLoopOnAppenders(e);

} catch (InterruptedException var5) {

break;

}

}

AsyncAppenderBase.this.addInfo("Worker thread will flush remaining events before exiting. ");

//走到這裡說明線程被中斷,則 把隊列裡的剩餘日志任務 重新整理到磁盤

Iterator i$ = parent.blockingQueue.iterator();

while(i$.hasNext()) {

E ex = i$.next();

aai.appendLoopOnAppenders(ex);

parent.blockingQueue.remove(ex);

}

aai.detachAndStopAllAppenders();

}

}

可以看到,run 方法中,先使用 take 方法 擷取日志任務 ,如果目前隊列為空,則 目前線程阻塞,直到隊列不為空才傳回。擷取到日志任務後,調用 AppenderAttachableImpl 的 appendLoopOnAppenders 方法 ,該方法會循環調用通過 addAppender 注入的同步日志,appener 具體實作把日志列印到磁盤。

總結:

以上是 logback 中異步日志的實作,包括并發元件 ArrayBlockingQueue 的使用,包括 put、offer 方法的使用場景 以及 它們之間的差別,take 方法的使用,而且 ArrayBlockingQueue 實作的是 多生産者-單消費者模型。使用 ArrayBlockingQueue 時需要注意合理設定隊列的大小 以免造成 OOM,隊列滿 或者 剩餘元素比較少時,要根據具體場景制定一些抛棄政策,以避免隊列滿時線程被阻塞。

二、Tomcat 的 NIOEndPoint 中 ConcurrentLinkedQueue 的使用

首先介紹 Tomcat 的容器結構 :

SpringBoot內建logback異步日志

其中,Connector 是一個橋梁,它把 Server 和 Engine 連接配接起來,Connector 的作用是 接受用戶端的請求 ,然後 把請求委托給 Engine 容器處理。在Connector 内部 具體使用 EndPoint 進行處理。根據處理方式的不同,EndPoint 分為 NioEndpoint、JIoEndpoint、AprEndpoint。

NioEndpoint 的三大元件的關系:

SpringBoot內建logback異步日志
  • Acceptor 是套接字接收線程(Socket acceptor thread),用來接受使用者的請求,并把請求封裝成事件任務 放入 Poller 隊列,一個 Connector 裡隻有一個 Acceptor 。
  • Poller 是套接字處理線程(Socker poller thread),每個 poller 内部都有一個獨有的隊列,Poller 線程從自己的隊列裡擷取具體的事件任務,然後将其交給 Worker 進行處理。Poller 線程的個數 與 處理器的核數有關。

    源碼:

protected int pollerThreadCount =

Math.min(2,Runtime.getRuntime().availableProcessors());

這裡,最多有 2 個 Poller 線程。

  • Worker 是實際處理請求的線程,Worker 隻是元件名字,真正做事情的是 SocketProcessor,它是 Poller 線程從自己的隊列擷取任務後的真正任務執行者。

可以看到,Tomcat 使用隊列把接受請求 與 處理請求操作進行解耦,實作異步處理,其實 Tomcat 中 每個 Poller 裡 都維護了一個 ConcurrentLinkedQueue ,用來緩存請求任務,其本身也是一個 多生産者-但消費者模型。

1、 生産者——Acceptor 線程

Acceptor 線程的作用是接受用戶端發來的連接配接請求 并将其放入 Poller 的事件隊列,Acceptor 處理請求的簡明時序圖:

SpringBoot內建logback異步日志

接下來看源碼,Accepter 如何把接受的套接字連接配接放入隊列:

protected class Acceptor extends AbstractEndpoint.Acceptor{

@Override

public void run(){

int errorDelay = 0;

// 一直循環直到接收到 shutdown 指令

while(running){

...

if (! running) {

break;

}

state = AcceptorState.RUNNING;

try{

// 如果達到 max connections 個請求 則目前請求被挂起

countUpOrAwaitConnection();

SocketChannel socket = null;

try{

// 從 TCP 緩存擷取一個完成三次握手的套接字,沒有則阻塞挂起

socket = serverSocket.accept();

} catch (IOException ioe) {

...

}

errorDelay = 0;

if( running && !paused){

// (一)設定套接字參數 并 封裝套接字為事件任務,

// 然後放入 Poller 隊列

if( ! setSocketOptions(socket)){

countDownConnection();

closeSocket(socket);

}

} else {

countDownConnection();

closeSocket(socket);

}

...

} catch (SocketTimeOutException sx) {

...

}

state = AcceptorState.ENDED;

}

}

(一)setSocketOptions:

protected boolean setSocketOptions(SocketChannel socket){

// 處理來南京

try{

...

// (二)封裝連接配接套接字為 channel 并注冊到 poller 隊列

getPoller0().register(channel);

} catch (Throwable t){

...

return false;

}

return true;

(二)register:

public void register(final NioChannel socket){

...

PollerEvent r = eventCache.poll();

ka.interestOps(SelectionKey.OP_read);

if(r == null ) r = new PollerEvent(socket,ka,OP_ REGISTER);

else r.reset(socket,ka,OP_ REGISTER);

//(三)

addEvent(r);

}

(三)addEvent:

public void addEvent(Runnable event){

events.offer(event);

...

}

其中,events 的定義:

protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable> ();

可以看到,events 是一個無界隊列 ConcurrentLinkedQueue,之前說 異步日志列印時 要注意設定隊列大小,否則會導緻 OOM,Tomcat 的 NIOEndpoint 也是如此,Tomcat 會讓使用者配置一個最大連接配接數 ,超過這個數,請求就會被挂起。

2、 消費者——Poller 線程

Poller 線程的作用是從事件隊列中 擷取事件并進行處理。先看看它的簡明時序圖:

SpringBoot內建logback異步日志

Poller 線程的 run 方法源碼:

public void run(){

while(true) {

try {

...

if(close) {

...

} else {

// (一)從事件隊列擷取事件

hasEvents = events();

}

try{

...

} catch (NullPointerException x){ ...

}

Iterator<SelectionKey> iterator = keyCount > 0? selector.selectedKeys().iterator() : null;

// (二)周遊所有注冊的 channel 并對感興趣的事情進行處理

while( iterator != null && iterator.hasNext()){

SelectionKey sk = iterator.next();

KeyAttachment attachment = (KeyAttachment) sk.attachment();

if(attachment == null) {

iterator.remove();

}else {

attachment.access();

iterator.remove();

// 具體調用 SocketProcessor 進行處理

processKey(sk, attachment);

}

} // while

...

} catch (OutOfMemoryError oom) {

...

}

} // while

...

}

(一)events():

public boolean events(){

boolean result = false;

// 從隊列擷取任務并執行

Runnable r = null;

while((r = events.poll())!= null) {

result = true;

try{

r.run();

...

} catch (Throwable x){

...

}

}

return result;

(二)processSocket:

public boolean processSocket(NioChannel socket, SocketStatus status,boolean dispatch){

try{

...

SocketProcessor sc = processorCache.poll();

if(sc == null) sc = new SocketProcessor(socket,status);

else sc.reset(socket,status);

if(dispatch && getExecutor() != null) getExecutor().execute(sc);

else sc.run();

} catch(RejectedException rx){

...

} catch(Throwable t){

...

return false;

}

return true;

總結與補充:

Tomcat 使用 Connector 處理連接配接,一個Tomcat 可以配置多個 Connector,分别監聽不同端口,或處理不同協定。

8.5 以後的 Tomcat 的 start 方法中,會自動配置一個 非阻塞 IO 的 connector ,可以 指定 Protocol,初始化相應的 Endpoint,我們分析的是 NioEndpoint:

(1)init 過程:

調用 NioEndpoint 的 bind 監視操作;

在 bind() 中會通過 ServerSocketChannel.open() 開啟 ServerSocketChannel,并設定 acceptor 線程數為1 ,poller 線程數為2(單核 CPU 為 1,多核為 2) 。

(2)start 過程:

啟動 worker 線程池,啟動 1 個 Acceptor 線程 和 2 個 Poller 線程,當然它們都是預設值,可配;

(3)Acceptor

Acceptor 循環調用 ServerSocketChannel 的 accept() 方法擷取新的連接配接,就會建立一個 SocketChannel 執行個體,然後getPoller0() 擷取其中一個 Poller,然後把這個 SocketChannel 注冊 register 到 Poller 中;

(4)Poller

Poller 内部有個SynchronizedQueue類型的 events 隊列,events() 方法取出目前隊列中的 PollerEvent 對象,逐個執行 run() ,run() 方法主要将這個新連接配接 SocketChannel 注冊到該 poller 的 Selector 中,(每個 poller 會關聯一個 selector)監聽 OP_READ 事件,一旦該 SocketChannel 是 readable 的狀态,那麼就會進入到 processKey 方法,會建立 SocketProcessor 執行個體,把執行個體送出到線程池中。

github位址:https://github.com/fishbaby003/fish-springboot