如何做一個實時的業務統計的監控?比如分鐘級?也就是每分鐘可以快速看到業務的變化趨勢,及可以做一些簡單的分組查詢?
哎,你可能說很簡單了,直接從資料庫 count 就可以了! 你是對的。
但如果不允許你使用db進行count呢?因為線上資料庫資源可是很寶貴的哦,你這一count可能會給db帶來災難了。
那不然咋整?
沒有db,我們還有其他資料源嘛,比如: 消息隊列?埋點資料? 本文将是基于該前提而行。
做監控,盡量不要侵入業務太多!是以有一個消息中間件是至關重要的。針對大資料系統,一般是: kafka 或者 類kafka. (如本文基礎 loghub)
有了消息中間件,如何進行分鐘級監控? 這個應該就很簡單了吧。不過如果要自己實作,其實坑也不少的!
如果自己實作計數,那麼你可能需要做以下幾件事:
1. 每消費一個消息,你需要一個累加器;
2. 每隔一個周期,你可能需要一個歸檔操作;
3. 你可能需要考慮各種并發安全問題;
4. 你可能需要考慮種性能問題;
5. 你可能需要考慮各種機器故障問題;
6. 你可能需要考慮各種邊界值問題;
哎,其實沒那麼難。時間序列資料庫,就專門為這類事情而生!如OpenTSDB: http://opentsdb.net/overview.html
可以說,TSDB 是這類應用場景的殺手锏。或者基于流計算架構: 如flink, 也是很輕松完成的事。但是不是本文的方向,略過!
本文是基于 loghub 的現有資料,進行分鐘級統計後,入庫 mysql 中,進而支援随時查詢。(因loghub每次查詢都是要錢的,是以,不可能直接查詢)
loghub 資料結構如: 2019-07-10 10:01:11,billNo,userId,productCode,...
由于loghub提供了很多強大的查詢統計功能,是以我們可以直接使用了。
核心功能就是一個統計sql,還是比較簡單的。但是需要考慮的點也不少,接下來,将為看官們奉上一個完整的解決方案!
撸代碼去!
1. 核心統計任務實作類 MinuteBizDataCounterTask
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.QueriedLog;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetLogsResponse;
import com.my.service.statistics.StatisticsService;
import com.my.entity.BizDataStatisticsMin;
import com.my.model.LoghubQueryCounterOffsetModel;
import com.my.util.loghub.LogHubProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
/**
* 基于loghub 的分鐘級 統計任務
*/
@Component
@Slf4j
public class MinuteBizDataCounterTask implements Runnable {
@Resource
private LogHubProperties logHubProperties;
@Resource
private StatisticsService statisticsService;
@Resource(name = "defaultOffsetQueryTaskCallback")
private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;
/**
* loghub 用戶端
*/
private volatile Client mClient;
/**
* 過濾的topic
*/
private static final String LOGHUB_TOPIC = "topic_test";
/**
* 單次掃描loghub最大時間 間隔分鐘數
*/
@Value("${loghub.offset.counter.perScanMaxMinutesGap}")
private Integer perScanMaxMinutesGap;
/**
* 單次循環最大數
*/
@Value("${loghub.offset.counter.perScanMaxRecordsLimit}")
private Integer perScanMaxRecordsLimit;
/**
* 構造必要執行個體資訊
*/
public ProposalPolicyBizDataCounterTask() {
}
@Override
public void run() {
if(mClient == null) {
this.mClient = new Client(logHubProperties.getEndpoint(),
logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
}
while (!Thread.interrupted()) {
try {
updateLastMinutePolicyNoCounter();
Thread.sleep(60000);
}
catch (InterruptedException e) {
log.error("【分鐘級統計task】, sleep 中斷", e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
// 注意此處可能有風險,發生異常後将快速死循環
log.error("【分鐘級統計task】更新異常", e);
try {
Thread.sleep(10000);
}
catch (InterruptedException ex) {
log.error("【分鐘級統計task】異常,且sleep異常", ex);
Thread.currentThread().interrupt();
}
}
}
}
/**
* 更新最近的資料 (分鐘級)
*
* @throws LogException loghub查詢異常時抛出
*/
private void updateLastMinutePolicyNoCounter() throws LogException {
updateMinutePolicyNoCounter(null);
}
/**
* 更新最近的資料
*/
public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {
// 1. 擷取偏移量
// 2. 根據偏移量,判定是否可以一次性取完,或者多次擷取更新
// 3. 從loghub中設定偏移量,擷取統計資料,更新
// 4. 更新db資料統計值
// 5. 更新偏移量
// 6. 等待下一次更新
// 指定offset時,可能為補資料
final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);
initSharedQueryOffset(destOffset, destOffset == specifyOffset);
Integer totalAffectNum = 0;
while (!isScanFinishOnDestination(destOffset)) {
// 完整掃描一次時間周期
calcNextSharedQueryOffset(destOffset);
while (true) {
calcNextInnerQueryOffset();
ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();
Integer affectNum = handleMiniOffsetBatchCounter(logs);
totalAffectNum += affectNum;
log.info("【分鐘級統計task】本次更新資料:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());
if(!hasMoreDataOffset(logs.size())) {
rolloverOffsetAndCommit();
break;
}
}
}
log.info("【分鐘級統計task】本次更新資料,總共:{}, destOffset:{}, curOffset:{}",
totalAffectNum, destOffset, getCurrentSharedQueryOffset());
rolloverOffsetAndCommit();
return totalAffectNum;
}
/**
* 處理一小批的統計資料
*
* @param logs 小批統計loghub資料
* @return 影響行數
*/
private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) {
if (logs == null || logs.isEmpty()) {
return 0;
}
List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>();
for (QueriedLog log1 : logs) {
LogItem getLogItem = log1.GetLogItem();
BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);
statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);
statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());
statisticsMinList.add(statisticsMin1);
}
return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());
}
/**
* 擷取共享偏移資訊
*
* @return 偏移
*/
private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {
return defaultOffsetQueryTaskCallback.getCurrentOffset();
}
/**
* 判斷本次是否掃描完成
*
* @param destOffset 目标偏移
* @return true:掃描完成, false: 未完成
*/
private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {
return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();
}
/**
* 擷取偏移送出回調器
*
* @return 回調執行個體
*/
private OffsetQueryTaskCallback getCurrentOffsetCallback() {
return defaultOffsetQueryTaskCallback;
}
/**
* 初始化共享的查詢偏移變量
*
* @param destOffset 目标偏移
* @param isSpecifyOffset 是否是手動指定的偏移
*/
private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {
// 整分花時間資料
Integer queryStartTime = destOffset.getStartTime();
if(queryStartTime % 60 != 0) {
queryStartTime = queryStartTime / 60 * 60;
}
// 将目标掃描時間終點 設定為起點,以備後續疊代
defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,
destOffset.getOffsetStart(), destOffset.getLimit(),
destOffset.getIsNewStep(), isSpecifyOffset);
if(defaultOffsetQueryTaskCallback.getIsNewStep()) {
resetOffsetDefaultSettings();
}
}
/**
* 計算下一次統計偏移時間
*
* @param destOffset 目标偏移值
*/
private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {
int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;
if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {
defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;
if(nextExpectEndTime > destOffset.getEndTime()) {
nextExpectEndTime = destOffset.getEndTime();
}
defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);
}
else {
defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());
}
resetOffsetDefaultSettings();
}
/**
* 重置偏移預設配置
*/
private void resetOffsetDefaultSettings() {
defaultOffsetQueryTaskCallback.setIsNewStep(true);
defaultOffsetQueryTaskCallback.setOffsetStart(0);
defaultOffsetQueryTaskCallback.setLimit(0);
}
/**
* 計算下一次小偏移,此種情況應對 一次外部偏移未查詢完成的情況
*/
private void calcNextInnerQueryOffset() {
defaultOffsetQueryTaskCallback.setIsNewStep(false);
// 第一次計算時,limit 為0, 是以得出的 offsetStart 也是0
defaultOffsetQueryTaskCallback.setOffsetStart(
defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());
defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);
}
/**
* 擷取目前循環的掃描區間
*
* @return 15567563433-1635345099 區間
*/
private String getCurrentScanTimeDuring() {
return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();
}
/**
* 從loghub查詢每分鐘的統計資訊
*
* @return 查詢到的統計資訊
* @throws LogException loghub 異常時抛出
*/
private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {
// 先按保單号去重,再進行計數統計
String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +
" | select count(1) as totalCountMin, " +
"split(bizData, ',')[2] as productCode," +
"split(bizData, ',')[3] as schemaCode," +
"split(bizData, ',')[4] as channelCode," +
"substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +
"group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";
countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();
GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),
defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),
LOGHUB_TOPIC, countSql);
if(!countResponse.IsCompleted()) {
log.error("【分鐘級統計task】掃描擷取到未完整的資料,請速檢查原因,offSet:{}", getCurrentSharedQueryOffset());
}
return countResponse.GetLogs() == null
? new ArrayList<>()
: countResponse.GetLogs();
}
/**
* 根據上一次傳回的記錄數量,判斷是否還有更多資料
*
* @param lastGotRecordsCount 上次傳回的記錄數 (資料量大于最大數說明還有未取完資料)
* @return true: 是還有更多資料應該再循環擷取, false: 無更多資料結束本期任務
*/
private boolean hasMoreDataOffset(int lastGotRecordsCount) {
return lastGotRecordsCount >= defaultOffsetQueryTaskCallback.getLimit();
}
/**
* 加強版的 offset 優先級: 指定偏移 -> 基于緩存的偏移 -> 新生成偏移辨別
*
* @param specifyOffset 指定偏移(如有)
* @return 偏移辨別
*/
private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {
if(specifyOffset != null) {
return specifyOffset;
}
LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();
if(offsetBaseOnCache != null) {
return offsetBaseOnCache;
}
return generateNewOffset();
}
/**
* 基于緩存擷取一下偏移辨別
*
* @return 偏移
*/
private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {
LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();
if(offsetFromCache == null) {
return null;
}
LocalDateTime now = LocalDateTime.now();
LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
now.getHour(), now.getMinute());
// 如果上次仍未内部循環完成,則使用原來的
if(offsetFromCache.getIsNewStep()) {
offsetFromCache.setStartTime(offsetFromCache.getEndTime());
long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
offsetFromCache.setEndTime((int) endTime);
}
return offsetFromCache;
}
/**
* 生成新的完整的 偏移辨別
*
* @return 新偏移
*/
private LoghubQueryCounterOffsetModel generateNewOffset() {
LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();
LocalDateTime now = LocalDateTime.now();
LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
now.getHour(), now.getMinute());
long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));
long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
offsetNew.setStartTime((int) startTime);
offsetNew.setEndTime((int) endTime);
return offsetNew;
}
/**
* 将日志傳回資料 适配到資料庫記錄中
*
* @param logItem 日志詳情
* @return db資料結構對應
*/
private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {
ArrayList<LogContent> logContents = logItem.GetLogContents();
BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();
for (LogContent logContent : logContents) {
switch (logContent.GetKey()) {
case "totalCountMin":
statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));
break;
case "productCode":
statisticsMin1.setProductCode(logContent.GetValue());
break;
case "myDateTimeMinute":
String signDtMinStr = logContent.GetValue();
String[] dateTimeArr = signDtMinStr.split(" ");
String countDate = dateTimeArr[0];
String[] timeArr = dateTimeArr[1].split(":");
String countHour = timeArr[0];
String countMin = timeArr[1];
statisticsMin1.setCountDate(countDate);
statisticsMin1.setCountHour(countHour);
statisticsMin1.setCountMin(countMin);
break;
default:
break;
}
}
return statisticsMin1;
}
/**
* 重置預設值,同時送出目前 (滾動到下一個偏移點)
*/
private void rolloverOffsetAndCommit() {
resetOffsetDefaultSettings();
commitOffsetSync();
}
/**
* 送出偏移量
*
*/
private void commitOffsetSync() {
defaultOffsetQueryTaskCallback.commit();
}
}
主要實作邏輯如下:
1. 每隔一分鐘進行一個查詢;
2. 發生異常後,容錯繼續查詢;
3. 對于一個新統計,預設倒推一天範圍進行統計;
4. 統計時間範圍間隔可設定,避免一次查詢數量太大,費用太高且查詢傳回數量有限;
5. 對于每次小批量查詢,支援分布操作,直到取完資料;
6. 小批量資料完成後,自動送出查詢偏移;
7. 後續查詢将基礎送出的偏移進行;
8. 支援斷點查詢;
2. 偏移送出管理器 OffsetQueryTaskCallback
主任務中,隻管進行資料統計查詢,送出偏移操作由其他類進行;
/**
* 普通任務回調接口定義, 考慮到多種類型的統計任務偏移操作方式可能不一,定義一個通用型偏移接口
*
*/
public interface OffsetQueryTaskCallback {
/**
* 回調方法入口, 送出偏移
*/
public void commit();
/**
* 設定初始化綁定目前偏移(期間不得改變)
*
* @param startTime 偏移開始時間
* @param endTime 偏移結束時間
* @param offsetStart 偏移開始值(分頁)
* @param limit 單次取值最大數(分頁)
* @param isNewStep 是否是新的查詢
* @param isSpecifyOffset 是否是指定的偏移
*/
public void initCurrentOffset(Integer startTime, Integer endTime,
Integer offsetStart, Integer limit,
Boolean isNewStep, Boolean isSpecifyOffset);
/**
* 從目前環境中擷取目前偏移資訊
*
* @return 偏移變量執行個體
*/
public LoghubQueryCounterOffsetModel getCurrentOffset();
}
import com.alibaba.fastjson.JSONObject;
import com.my.util.constants.RedisKeysConstantEnum;
import com.my.util.redis.RedisPoolUtil;
import com.my.model.LoghubQueryCounterOffsetModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 預設偏移回調實作
*
*/
@Component("defaultOffsetQueryTaskCallback")
@Slf4j
public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {
@Resource
private RedisPoolUtil redisPoolUtil;
/**
* 目前偏移資訊
*/
private ThreadLocal<LoghubQueryCounterOffsetModel> currentOffsetHolder = new ThreadLocal<>();
@Override
public void commit() {
if(!currentOffsetHolder.get().getIsSpecifyOffset()) {
redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),
JSONObject.toJSONString(currentOffsetHolder.get()));
}
}
@Override
public void initCurrentOffset(Integer startTime, Integer endTime,
Integer offsetStart, Integer limit,
Boolean isNewStep, Boolean isSpecifyOffset) {
LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();
currentOffset.setStartTime(startTime);
currentOffset.setEndTime(endTime);
currentOffset.setOffsetStart(offsetStart);
currentOffset.setIsNewStep(isNewStep);
currentOffset.setIsSpecifyOffset(isSpecifyOffset);
currentOffsetHolder.set(currentOffset);
}
@Override
public LoghubQueryCounterOffsetModel getCurrentOffset() {
return currentOffsetHolder.get();
}
/**
* 從緩存中擷取目前偏移資訊
*
* @return 緩存偏移或者 null
*/
public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {
String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());
if (StringUtils.isBlank(offsetCacheValue)) {
return null;
}
return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);
}
public Integer getStartTime() {
return currentOffsetHolder.get().getStartTime();
}
public void setStartTime(Integer startTime) {
currentOffsetHolder.get().setStartTime(startTime);
}
public Integer getEndTime() {
return currentOffsetHolder.get().getEndTime();
}
public void setEndTime(Integer endTime) {
currentOffsetHolder.get().setEndTime(endTime);
}
public Integer getOffsetStart() {
return currentOffsetHolder.get().getOffsetStart();
}
public void setOffsetStart(Integer offsetStart) {
currentOffsetHolder.get().setOffsetStart(offsetStart);
}
public Integer getLimit() {
return currentOffsetHolder.get().getLimit();
}
public void setLimit(Integer limit) {
currentOffsetHolder.get().setLimit(limit);
}
public Boolean getIsNewStep() {
return currentOffsetHolder.get().getIsNewStep();
}
public void setIsNewStep(Boolean isNewStep) {
currentOffsetHolder.get().setIsNewStep(isNewStep);
}
}
/**
* loghub 查詢偏移量 資料容器
*
*/
@Data
public class LoghubQueryCounterOffsetModel implements Serializable {
private static final long serialVersionUID = -3749552331349228045L;
/**
* 開始時間
*/
private Integer startTime;
/**
* 結束時間
*/
private Integer endTime;
/**
* 起始偏移
*/
private Integer offsetStart = 0;
/**
* 每次查詢的 條數限制, 都需要進行設定後才可用, 否則查無資料
*/
private Integer limit = 0;
/**
* 是否新的偏移循環,如未完成,應繼續子循環 limit
*
* true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 進行limit相加
*/
private Boolean isNewStep = true;
/**
* 是否是手動指定的偏移,如果是說明是在手動被資料,偏移量将不會被更新
*
* 此變量是瞬時值,将不會被持久化到偏移辨別中
*/
private transient Boolean isSpecifyOffset;
}
3. 批量更新統計結果資料庫的實作
因每次統計的資料量是不确定的,因盡可能早的送出一次統計結果,防止一次送出太多,或者 機器故障時所有統計白費,是以需要分小事務進行。
@Service
public class StatisticsServiceImpl implements StatisticsService {
/**
* 批量更新統計分鐘級資料 (事務型送出)
*
* @param statisticsMinList 新統計資料
* @return 影響行數
*/
@Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)
public Integer batchUpsertPremiumStatistics(List<BizProposalPolicyStatisticsMin> statisticsMinList,
OffsetQueryTaskCallback callback) {
AtomicInteger updateCount = new AtomicInteger(0);
statisticsMinList.forEach(item -> {
int affectNum = 0;
BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);
if (oldStatistics == null) {
item.setEtlVersion(item.getEtlVersion() + ":0");
affectNum = bizProposalPolicyStasticsMinMapper.insert(item);
} else {
oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());
String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());
oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());
// todo: 優化更新版本号問題
affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);
}
updateCount.addAndGet(affectNum);
});
callback.commit();
return updateCount.get();
}
/**
* 版本号過濾器(組裝版本資訊)
*
* @param oldVersion 老版本資訊
* @param currentVersion 目前版本号
* @return 可用的版本資訊
*/
private String versionKeeperFilter(String oldVersion, String currentVersion) {
String versionFull = oldVersion + "," + currentVersion;
if (versionFull.length() >= 500) {
// 從150以後,第一版本号開始保留
versionFull = versionFull.substring(versionFull.indexOf(',', 150));
}
return versionFull;
}
}
4. 你需要一個啟動任務的地方
/**
* 啟動時運作的任務排程服務
*
*/
@Service
@Slf4j
public class TaskAutoRunScheduleService {
@Resource
private MinuteBizDataCounterTask minuteBizDataCounterTask;
@PostConstruct
public void bizDataAutoRun() {
log.info("============= bizDataAutoRun start =================");
ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));
executorService.submit(minuteBizDataCounterTask);
}
}
5. 将每分鐘的資料從db查詢出來展示到頁面
以上将資料統計後以分鐘級彙總到資料,接下來,監控頁面就隻需從db中進行簡單聚合就可以了,咱們就不費那精力去展示了。
6. 待完善的地方
1. 叢集環境的任務運作将會出問題,解決辦法是:加一個分布式鎖即可。 你可以的!
2. 針對重試執行統計問題,還得再考慮考慮。(幂等性)
唠叨: 踩坑不一定是壞事!
不要害怕今日的苦,你要相信明天,更苦!