前言
為了便于分析和排查問題,我希望可以儲存下每一條請求日志。那既然每一條都要儲存,把這個功能添加到網關服務中我覺得是比較合适的了。
但是也正因為所有的請求都要經過這個被儲存的過程,是以我希望這個過程要盡可能的簡短不費時間,不要影響我的通路速度。
正巧我學過一點 Redis 的知識,簡單了解過Redis持久化的思路,于是我參考了它的思路,給自己的網關服務寫了一個使用 緩存 + 異步 的存儲政策,當然我這裡持久化就是儲存到MySQL了。
本文所介紹的内容均來自我的開源項目校園部落格中,開源位址:stick-i/scblogs: 校園部落格,基于微服務架構且前後端分離的部落格社群系統。項目後端技術棧:SpringBoot + SpringCloud + Mybatis-Plus + Nacos + MySQL + Redis + MQ + ElasticSearch + Docker。前端主要是基于Vue2和ElementUI進行開發的。 (github.com)
基本思路
我的大緻思路是這樣的:
- 先把每一條請求資訊存到緩存中,可以直接存 List,或者使用 Redis 。
- 每隔一段時間,建立一個子線程,去讀取緩存中的資料,并将資料存入硬碟,這裡我存MySQL。
- 如果沒有新的通路記錄,那就不用去定時執行了,是以最好可以主動調節,通過請求觸發子程序的儲存。
- 程式正常退出的情況下,最好能夠再主動儲存一次緩存中的資料,這裡可以注冊一個Hook來執行,防止定時任務沒到執行時間。
由于這個功能我已經在項目中實作好了,并且已經使用一段時間了,是以下面我會直接就着已經寫好的代碼來跟大家分析講解。
代碼實作
首先介紹一下基本情況:
- 項目的網關使用的是Spring Cloud Gateway,儲存請求記錄的邏輯通過全局過濾器 GlobalFilter 來調用,也就是用一個單獨的過濾器去儲存通路記錄。
- 用于存儲到資料庫的實體類為 VisitRecord ,内容包括ip位址、uri、請求方法、請求參數、狀态碼等資訊。這些資訊可以從ServerWebExchange的對象中擷取,解析之後放到 VisitRecord的對象裡就行了。
有上面這些條件後,下面我們就可以隻關注 請求日志 的實作了,這部分的實作我放在了VisitRecordService類裡面了,源碼所在位置:scblogs/VisitRecordService.java at main · stick-i/scblogs (github.com)。
在下面的講解中,我剔除了大部分業務相關的東西,但是我保留了一部分。
是故意的還是不小心的?
入口
通過調用下面的方法,可以将經過網關的通路記錄進行儲存。
/**
* 儲存通路記錄
*
* @param exchange gateway通路合同
*/
public void add(ServerWebExchange exchange) {
// 擷取資訊
ServerHttpResponse response = exchange.getResponse();
ServerHttpRequest request = exchange.getRequest();
// 建構VisitRecord
VisitRecord visitRecord = getOrBuild(exchange);
// 列印通路情況
log.info(visitRecord.toString());
// 添加通路記錄
addRecord(visitRecord);
}
複制代碼
這段代碼很簡單,就是先拿到了要被存儲的通路記錄資訊,然後再去調用了另一個方法addRecord()。
存入緩存
我們接着上面的addRecord()方法繼續往下看:
private void addRecord(VisitRecord record) {
// 添加記錄到緩存中
visitCache.add(record);
// 執行任務,儲存資料
doTask();
}
複制代碼
這個方法也很簡單,就是往緩存裡添加了這條新的記錄,然後調用了doTask()方法去執行存儲的任務。
先看看這個visitCache是個什麼東西?
/**
* 緩存,在插入資料庫前先存入此。
* 為防止資料被重複插入,故使用Set,但不能確定100%不會被重複存儲。
*/
private HashSet<VisitRecord> visitCache = new HashSet<>();
複制代碼
其實就是個HashSet,不過我這裡用Set是有原因的:
在我的這個項目中有個Gateway專用的全局異常處理器GlobalExceptionHandler,如果發生異常的話,會被這個處理器捕獲,并且會打斷過濾器的執行。基于這個邏輯,可能會出現兩種情況:
- 已經執行了儲存的方法,在儲存請求記錄過濾器的後面抛出了異常,這樣的話不需要再重新儲存日志了。
- 還沒執行過儲存的方法,也就是在它前面抛出了異常,這樣的話肯定是需要重新儲存日志的。
于是綜合這兩種情況,我選擇了使用Set,并且在異常處理器裡加入了儲存通路記錄的邏輯(就是調用最上面那個入口方法),這樣可以保證不會出現漏掉通路記錄的情況,也可以盡量避免重複儲存的情況,但不能完全保證不會被重複儲存。
多講了幾局題外話,這個跟主題關系不大了,感興趣的朋友可以去GitHub看我的項目源碼繼續了解:連結。
執行任務
資料已經存到緩存了,我們接着上面的 doTask(); 方法看:
private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix("visit-record-").build();
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
/**
* 信号量,用于标記目前是否有任務正在執行,{@code true}表示目前無任務進行。
*/
private volatile boolean taskFinish = true;
/**
* 單次批量插入的資料量
*/
private final int BATCH_SIZE = 500;
private void doTask() {
if (taskFinish) {
// 目前沒有任務的情況下,加鎖并執行任務
synchronized (this) {
if (taskFinish) {
taskFinish = false;
threadPool.execute(() -> {
try {
// 當資料量較小時,則等待一段時間再插入資料,進而做到将資料盡可能的批量插入資料庫
if (visitCache.size() <= BATCH_SIZE) {
Thread.sleep(500);
}
batchSave();
} catch (InterruptedException e) {
log.error("休眠時發生了異常: {}", e.getMessage());
} finally {
// 任務執行完畢後修改标志位
taskFinish = true;
}
});
}
}
}
}
複制代碼
這部分就有點東西了:
- 首先是經典的雙重檢查鎖定,使用taskFinish信号量,并且被volatile修飾,估計在面試資料的單例模式寫法上見過吧。這樣可以保證第二個if裡面的東西隻會被單獨執行,而不會并發執行。
- 有一個被final修飾的線程池,并且用了線程工廠,這樣在列印日志的時候可以看到哪些日志是由這部分代碼列印的噢。線程池的核心線程數是 1 ,這跟下面的任務送出有關,每次最多隻會存在一個任務。
- 在第二個if裡面,是這個方法的核心邏輯。首先把信号量改為了false,表示目前已經有任務在執行了,然後向線程池送出了一個任務,在任務中通過 finally 保證任務執行完畢後再恢複信号量。
- 考慮到資料量比較小的時候,可能會不停的建立任務,資料儲存完之後馬上又需要儲存新的資料,而且可能每次都隻儲存了一條兩條資料,這樣有點違背了我們批量插入的初心,也浪費了性能。
- 是以我設定了一個常量BATCH_SIZE = 500,用來表示我希望單次批量插入的資料量。如果目前緩存裡的資料量小于該資料量,那麼讓線程在此等待那麼一會,再去執行真正的跟資料庫互動的操作batchSave()。
- 這裡需要考慮的是,如果我有另一個服務會去讀取并展示這些請求日志,那我肯定希望請求日志是能夠實時更新的,是以我選擇sleep 0.5秒,而不是等到資料量達到500才存入。
- 這樣既減輕了系統負擔,又可以盡量做到即時更新,可謂一舉兩得。
存入資料庫
經曆了這麼幾個步驟,終于要存資料庫了,也就是上文任務中的最後一個方法batchSave();,先來看看代碼:
/**
* 單次批量插入的資料量
*/
private final int BATCH_SIZE = 500;
/**
* 縮減因子,每次更新緩存Set時縮小的倍數,對應HashSet的擴容倍數
*/
private final float REDUCE_FACTOR = 0.5f;
private void batchSave() {
log.debug("通路記錄準備插入資料庫,目前資料量:{}", visitCache.size());
if (visitCache.size() == 0) {
return;
}
// 構造新對象來存儲資料,舊對象儲存到資料庫後不再使用
HashSet<VisitRecord> oldCache = visitCache;
visitCache = new HashSet<>((int) (oldCache.size() * REDUCE_FACTOR));
boolean isSave = false;
try {
// 存入資料庫
isSave = visitLogService.saveBatch(oldCache, BATCH_SIZE);
} finally {
if (!isSave) {
// 如果插入失敗,則重新添加所有資料
visitCache.addAll(oldCache);
}
}
}
複制代碼
這段代碼也是有亮點的,我們來分析一下:
- 首先檢查了一下緩存中的資料量,這沒什麼好說的。
- 然後将緩存對象visitCache使用了一個新的變量oldCache來引用,然後new了一個新的HashSet對象,并且讓visitCache去引用了這個新對象,再把oldCache批量插入資料庫,這裡的saveBatch是用的Mybatis-Plus的方法,就是批量插入到資料庫裡的。
這裡是有說法的:
- 為什麼我不直接儲存visitCache到資料庫,還要多建立一個新緩存對象,再去儲存舊對象?
- 結合本文存入緩存的代碼,我無法保證在把這些資料存入資料庫的期間沒有新的請求被存入緩存,也就是visitCache對象。那在visitLogService.saveBatch();執行完畢後,我就無法保證此時的visitCache全部被存到資料庫了,那我到底還要不要調用visitCache.clear()方法呢?
- 建立新對象時我是這麼寫的visitCache = new HashSet<>((int) (oldCache.size() * REDUCE_FACTOR));,為什麼我給HashSet的初始大小要使用 舊緩存的大小 * 0.5 呢?
- 首先,我不希望visitCache去慢慢擴容到合适的大小,這樣浪費性能。
- 其次,我希望它不要有過多的備援容量,如果我的初始化大小直接就是 oldCahce.size(),那它的容量永遠都不會降下來了。
- 至于為什麼是0.5,因為HashSet的底層其實就是個HashMap,而HashMap每次擴容都是上一次容量大小的兩倍,HashMap初始化容量大小的值,也必須是2的次方。如果不是2的次方,則會自動幫你調整為向上取的第一個2的次方的數,比如我給的參數是10,那它的初始容量就是16咯。
- 這裡我乘0.5,其實也不過就是給它降了一次擴容的空間罷了,聽懂掌聲。
- 其實這裡我也考慮過使用兩個HashSet去做一個滾筒的設計,就跟JVM記憶體中的from區to區一樣。但是我還是希望它的容量是可以降下來的,也算是自動調節吧,是以采用了這種方案。
注冊Hook
最後我希望程式在正常退出的情況下,能夠立馬執行一次儲存資料的任務,是以我在構造函數這裡添加一個ShutdownHook,讓它去執行存入資料庫的操作,盡量保證資料不丢失。
public VisitRecordService() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
this.batchSave();
threadPool.shutdown();
}));
}
複制代碼
後記
本文向大家分享了一種使用 緩存 + 異步 來存儲通路日志的方式,其實不止是通路日志,有其他類似場景的地方,也可以使用這種方案,我個人覺得是非常棒的。
原文連結:https://juejin.cn/post/7204731868137291813