天天看點

來!做一個分鐘級業務監控系統【實戰】

  如何做一個實時的業務統計的監控?比如分鐘級?也就是每分鐘可以快速看到業務的變化趨勢,及可以做一些簡單的分組查詢?

  哎,你可能說很簡單了,直接從資料庫 count 就可以了! 你是對的。

  但如果不允許你使用db進行count呢?因為線上資料庫資源可是很寶貴的哦,你這一count可能會給db帶來災難了。

那不然咋整?

沒有db,我們還有其他資料源嘛,比如: 消息隊列?埋點資料? 本文将是基于該前提而行。

  

做監控,盡量不要侵入業務太多!是以有一個消息中間件是至關重要的。針對大資料系統,一般是: kafka 或者 類kafka. (如本文基礎 loghub)

  有了消息中間件,如何進行分鐘級監控? 這個應該就很簡單了吧。不過如果要自己實作,其實坑也不少的!

如果自己實作計數,那麼你可能需要做以下幾件事: 

  1. 每消費一個消息,你需要一個累加器;

  2. 每隔一個周期,你可能需要一個歸檔操作;

  3. 你可能需要考慮各種并發安全問題;

  4. 你可能需要考慮種性能問題;

  5. 你可能需要考慮各種機器故障問題;

  6. 你可能需要考慮各種邊界值問題;

  哎,其實沒那麼難。時間序列資料庫,就專門為這類事情而生!如OpenTSDB: http://opentsdb.net/overview.html

  可以說,TSDB 是這類應用場景的殺手锏。或者基于流計算架構: 如flink, 也是很輕松完成的事。但是不是本文的方向,略過!

本文是基于 loghub 的現有資料,進行分鐘級統計後,入庫 mysql 中,進而支援随時查詢。(因loghub每次查詢都是要錢的,是以,不可能直接查詢)

  loghub 資料結構如: 2019-07-10 10:01:11,billNo,userId,productCode,...

  由于loghub提供了很多強大的查詢統計功能,是以我們可以直接使用了。

  核心功能就是一個統計sql,還是比較簡單的。但是需要考慮的點也不少,接下來,将為看官們奉上一個完整的解決方案!

撸代碼去!

1. 核心統計任務實作類 MinuteBizDataCounterTask

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.QueriedLog;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetLogsResponse;
import com.my.service.statistics.StatisticsService;
import com.my.entity.BizDataStatisticsMin;
import com.my.model.LoghubQueryCounterOffsetModel;
import com.my.util.loghub.LogHubProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;

/**
 * 基于loghub 的分鐘級 統計任務
 */
@Component
@Slf4j
public class MinuteBizDataCounterTask implements Runnable {

    @Resource
    private LogHubProperties logHubProperties;

    @Resource
    private StatisticsService statisticsService;

    @Resource(name = "defaultOffsetQueryTaskCallback")
    private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;

    /**
     * loghub 用戶端
     */
    private volatile Client mClient;

    /**
     * 過濾的topic
     */
    private static final String LOGHUB_TOPIC = "topic_test";

    /**
     * 單次掃描loghub最大時間 間隔分鐘數
     */
    @Value("${loghub.offset.counter.perScanMaxMinutesGap}")
    private Integer perScanMaxMinutesGap;

    /**
     * 單次循環最大數
     */
    @Value("${loghub.offset.counter.perScanMaxRecordsLimit}")
    private Integer perScanMaxRecordsLimit;

    /**
     * 構造必要執行個體資訊
     */
    public ProposalPolicyBizDataCounterTask() {

    }

    @Override
    public void run() {
        if(mClient == null) {
            this.mClient = new Client(logHubProperties.getEndpoint(),
                                logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
        }
        while (!Thread.interrupted()) {
            try {
                updateLastMinutePolicyNoCounter();
                Thread.sleep(60000);
            }
            catch (InterruptedException e) {
                log.error("【分鐘級統計task】, sleep 中斷", e);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                // 注意此處可能有風險,發生異常後将快速死循環
                log.error("【分鐘級統計task】更新異常", e);
                try {
                    Thread.sleep(10000);
                }
                catch (InterruptedException ex) {
                    log.error("【分鐘級統計task】異常,且sleep異常", ex);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /**
     * 更新最近的資料 (分鐘級)
     *
     * @throws LogException loghub查詢異常時抛出
     */
    private void updateLastMinutePolicyNoCounter() throws LogException {
        updateMinutePolicyNoCounter(null);
    }

    /**
     * 更新最近的資料
     */
    public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {
        // 1. 擷取偏移量
        // 2. 根據偏移量,判定是否可以一次性取完,或者多次擷取更新
        // 3. 從loghub中設定偏移量,擷取統計資料,更新
        // 4. 更新db資料統計值
        // 5. 更新偏移量
        // 6. 等待下一次更新

        // 指定offset時,可能為補資料
        final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);
        initSharedQueryOffset(destOffset, destOffset == specifyOffset);

        Integer totalAffectNum = 0;

        while (!isScanFinishOnDestination(destOffset)) {
            // 完整掃描一次時間周期
            calcNextSharedQueryOffset(destOffset);
            while (true) {
                calcNextInnerQueryOffset();
                ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();
                Integer affectNum = handleMiniOffsetBatchCounter(logs);
                totalAffectNum += affectNum;
                log.info("【分鐘級統計task】本次更新資料:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());
                if(!hasMoreDataOffset(logs.size())) {
                    rolloverOffsetAndCommit();
                    break;
                }
            }
        }
        log.info("【分鐘級統計task】本次更新資料,總共:{}, destOffset:{}, curOffset:{}",
                            totalAffectNum, destOffset, getCurrentSharedQueryOffset());
        rolloverOffsetAndCommit();
        return totalAffectNum;
    }

    /**
     * 處理一小批的統計資料
     *
     * @param logs 小批統計loghub資料
     * @return 影響行數
     */
    private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) {
        if (logs == null || logs.isEmpty()) {
            return 0;
        }
        List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>();
        for (QueriedLog log1 : logs) {
            LogItem getLogItem = log1.GetLogItem();
            BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);
            statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);
            statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());
            statisticsMinList.add(statisticsMin1);
        }
        return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());
    }


    /**
     * 擷取共享偏移資訊
     *
     * @return 偏移
     */
    private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {
        return defaultOffsetQueryTaskCallback.getCurrentOffset();
    }

    /**
     * 判斷本次是否掃描完成
     *
     * @param destOffset 目标偏移
     * @return true:掃描完成, false: 未完成
     */
    private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {
        return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();
    }

    /**
     * 擷取偏移送出回調器
     *
     * @return 回調執行個體
     */
    private OffsetQueryTaskCallback getCurrentOffsetCallback() {
        return defaultOffsetQueryTaskCallback;
    }

    /**
     * 初始化共享的查詢偏移變量
     *
     * @param destOffset 目标偏移
     * @param isSpecifyOffset 是否是手動指定的偏移
     */
    private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {
        // 整分花時間資料
        Integer queryStartTime = destOffset.getStartTime();
        if(queryStartTime % 60 != 0) {
            queryStartTime = queryStartTime / 60 * 60;
        }
        // 将目标掃描時間終點 設定為起點,以備後續疊代
        defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,
                                                        destOffset.getOffsetStart(), destOffset.getLimit(),
                                                        destOffset.getIsNewStep(), isSpecifyOffset);
        if(defaultOffsetQueryTaskCallback.getIsNewStep()) {
            resetOffsetDefaultSettings();
        }
    }

    /**
     * 計算下一次統計偏移時間
     *
     * @param destOffset 目标偏移值
     */
    private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {
        int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;
        if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {
            defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
            int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;
            if(nextExpectEndTime > destOffset.getEndTime()) {
                nextExpectEndTime = destOffset.getEndTime();
            }
            defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);
        }
        else {
            defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
            defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());
        }
        resetOffsetDefaultSettings();
    }

    /**
     * 重置偏移預設配置
     */
    private void resetOffsetDefaultSettings() {
        defaultOffsetQueryTaskCallback.setIsNewStep(true);
        defaultOffsetQueryTaskCallback.setOffsetStart(0);
        defaultOffsetQueryTaskCallback.setLimit(0);
    }

    /**
     * 計算下一次小偏移,此種情況應對 一次外部偏移未查詢完成的情況
     */
    private void calcNextInnerQueryOffset() {
        defaultOffsetQueryTaskCallback.setIsNewStep(false);
        // 第一次計算時,limit 為0, 是以得出的 offsetStart 也是0
        defaultOffsetQueryTaskCallback.setOffsetStart(
                defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());
        defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);
    }

    /**
     * 擷取目前循環的掃描區間
     *
     * @return 15567563433-1635345099 區間
     */
    private String getCurrentScanTimeDuring() {
        return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();
    }

    /**
     * 從loghub查詢每分鐘的統計資訊
     *
     * @return 查詢到的統計資訊
     * @throws LogException loghub 異常時抛出
     */
    private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {
        // 先按保單号去重,再進行計數統計
        String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +
                " | select count(1) as totalCountMin, " +
                "split(bizData, ',')[2] as productCode," +
                "split(bizData, ',')[3] as schemaCode," +
                "split(bizData, ',')[4] as channelCode," +
                "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +
                "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";
        countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();
        GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),
                defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),
                LOGHUB_TOPIC, countSql);
        if(!countResponse.IsCompleted()) {
            log.error("【分鐘級統計task】掃描擷取到未完整的資料,請速檢查原因,offSet:{}", getCurrentSharedQueryOffset());
        }
        return countResponse.GetLogs() == null
                    ? new ArrayList<>()
                    : countResponse.GetLogs();
    }

    /**
     * 根據上一次傳回的記錄數量,判斷是否還有更多資料
     *
     * @param lastGotRecordsCount 上次傳回的記錄數 (資料量大于最大數說明還有未取完資料)
     * @return true: 是還有更多資料應該再循環擷取, false: 無更多資料結束本期任務
     */
    private boolean hasMoreDataOffset(int lastGotRecordsCount) {
        return lastGotRecordsCount >= defaultOffsetQueryTaskCallback.getLimit();
    }

    /**
     * 加強版的 offset 優先級: 指定偏移 -> 基于緩存的偏移 -> 新生成偏移辨別
     *
     * @param specifyOffset 指定偏移(如有)
     * @return 偏移辨別
     */
    private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {
        if(specifyOffset != null) {
            return specifyOffset;
        }
        LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();
        if(offsetBaseOnCache != null) {
            return offsetBaseOnCache;
        }
        return generateNewOffset();
    }

    /**
     * 基于緩存擷取一下偏移辨別
     *
     * @return 偏移
     */
    private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {
        LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();
        if(offsetFromCache == null) {
            return null;
        }
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                                                    now.getHour(), now.getMinute());
        // 如果上次仍未内部循環完成,則使用原來的
        if(offsetFromCache.getIsNewStep()) {
            offsetFromCache.setStartTime(offsetFromCache.getEndTime());
            long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
            offsetFromCache.setEndTime((int) endTime);
        }
        return offsetFromCache;
    }

    /**
     * 生成新的完整的 偏移辨別
     *
     * @return 新偏移
     */
    private LoghubQueryCounterOffsetModel generateNewOffset() {
        LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                now.getHour(), now.getMinute());
        long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));
        long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
        offsetNew.setStartTime((int) startTime);
        offsetNew.setEndTime((int) endTime);
        return offsetNew;
    }
    /**
     * 将日志傳回資料 适配到資料庫記錄中
     *
     * @param logItem 日志詳情
     * @return db資料結構對應
     */
    private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {
        ArrayList<LogContent> logContents = logItem.GetLogContents();
        BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();
        for (LogContent logContent : logContents) {
            switch (logContent.GetKey()) {
                case "totalCountMin":
                    statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));
                    break;
                case "productCode":
                    statisticsMin1.setProductCode(logContent.GetValue());
                    break;
                case "myDateTimeMinute":
                    String signDtMinStr = logContent.GetValue();
                    String[] dateTimeArr = signDtMinStr.split(" ");
                    String countDate = dateTimeArr[0];
                    String[] timeArr = dateTimeArr[1].split(":");
                    String countHour = timeArr[0];
                    String countMin = timeArr[1];
                    statisticsMin1.setCountDate(countDate);
                    statisticsMin1.setCountHour(countHour);
                    statisticsMin1.setCountMin(countMin);
                    break;
                default:
                    break;
            }
        }
        return statisticsMin1;
    }

    /**
     * 重置預設值,同時送出目前 (滾動到下一個偏移點)
     */
    private void rolloverOffsetAndCommit() {
        resetOffsetDefaultSettings();
        commitOffsetSync();
    }

    /**
     * 送出偏移量
     *
     */
    private void commitOffsetSync() {
        defaultOffsetQueryTaskCallback.commit();
    }

}      

  主要實作邏輯如下:

    1. 每隔一分鐘進行一個查詢;

    2. 發生異常後,容錯繼續查詢;

    3. 對于一個新統計,預設倒推一天範圍進行統計;

    4. 統計時間範圍間隔可設定,避免一次查詢數量太大,費用太高且查詢傳回數量有限;

    5. 對于每次小批量查詢,支援分布操作,直到取完資料;

    6. 小批量資料完成後,自動送出查詢偏移;

    7. 後續查詢将基礎送出的偏移進行;

    8. 支援斷點查詢;

2. 偏移送出管理器 OffsetQueryTaskCallback

  主任務中,隻管進行資料統計查詢,送出偏移操作由其他類進行;

/**
 * 普通任務回調接口定義, 考慮到多種類型的統計任務偏移操作方式可能不一,定義一個通用型偏移接口
 *
 */
public interface OffsetQueryTaskCallback {

    /**
     * 回調方法入口, 送出偏移
     */
    public void commit();

    /**
     * 設定初始化綁定目前偏移(期間不得改變)
     *
     * @param startTime 偏移開始時間
     * @param endTime 偏移結束時間
     * @param offsetStart 偏移開始值(分頁)
     * @param limit 單次取值最大數(分頁)
     * @param isNewStep 是否是新的查詢
     * @param isSpecifyOffset 是否是指定的偏移
     */
    public void initCurrentOffset(Integer startTime, Integer endTime,
                                  Integer offsetStart, Integer limit,
                                  Boolean isNewStep, Boolean isSpecifyOffset);

    /**
     * 從目前環境中擷取目前偏移資訊
     *
     * @return 偏移變量執行個體
     */
    public LoghubQueryCounterOffsetModel getCurrentOffset();

}


import com.alibaba.fastjson.JSONObject;
import com.my.util.constants.RedisKeysConstantEnum;
import com.my.util.redis.RedisPoolUtil;
import com.my.model.LoghubQueryCounterOffsetModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 預設偏移回調實作
 *
 */
@Component("defaultOffsetQueryTaskCallback")
@Slf4j
public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {

    @Resource
    private RedisPoolUtil redisPoolUtil;

    /**
     * 目前偏移資訊
     */
    private ThreadLocal<LoghubQueryCounterOffsetModel> currentOffsetHolder = new ThreadLocal<>();


    @Override
    public void commit() {
        if(!currentOffsetHolder.get().getIsSpecifyOffset()) {
            redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),
                    JSONObject.toJSONString(currentOffsetHolder.get()));
        }
    }

    @Override
    public void initCurrentOffset(Integer startTime, Integer endTime,
                                  Integer offsetStart, Integer limit,
                                  Boolean isNewStep, Boolean isSpecifyOffset) {
        LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();
        currentOffset.setStartTime(startTime);
        currentOffset.setEndTime(endTime);
        currentOffset.setOffsetStart(offsetStart);
        currentOffset.setIsNewStep(isNewStep);
        currentOffset.setIsSpecifyOffset(isSpecifyOffset);
        currentOffsetHolder.set(currentOffset);
    }

    @Override
    public LoghubQueryCounterOffsetModel getCurrentOffset() {
        return currentOffsetHolder.get();
    }

    /**
     * 從緩存中擷取目前偏移資訊
     *
     * @return 緩存偏移或者 null
     */
    public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {
        String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());
        if (StringUtils.isBlank(offsetCacheValue)) {
            return null;
        }
        return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);
    }

    public Integer getStartTime() {
        return currentOffsetHolder.get().getStartTime();
    }

    public void setStartTime(Integer startTime) {
        currentOffsetHolder.get().setStartTime(startTime);
    }

    public Integer getEndTime() {
        return currentOffsetHolder.get().getEndTime();
    }

    public void setEndTime(Integer endTime) {
        currentOffsetHolder.get().setEndTime(endTime);
    }

    public Integer getOffsetStart() {
        return currentOffsetHolder.get().getOffsetStart();
    }

    public void setOffsetStart(Integer offsetStart) {
        currentOffsetHolder.get().setOffsetStart(offsetStart);
    }

    public Integer getLimit() {
        return currentOffsetHolder.get().getLimit();
    }

    public void setLimit(Integer limit) {
        currentOffsetHolder.get().setLimit(limit);
    }

    public Boolean getIsNewStep() {
        return currentOffsetHolder.get().getIsNewStep();
    }

    public void setIsNewStep(Boolean isNewStep) {
        currentOffsetHolder.get().setIsNewStep(isNewStep);
    }

}

/**
 * loghub 查詢偏移量 資料容器
 *
 */
@Data
public class LoghubQueryCounterOffsetModel implements Serializable {

    private static final long serialVersionUID = -3749552331349228045L;

    /**
     * 開始時間
     */
    private Integer startTime;

    /**
     * 結束時間
     */
    private Integer endTime;

    /**
     * 起始偏移
     */
    private Integer offsetStart = 0;

    /**
     * 每次查詢的 條數限制, 都需要進行設定後才可用, 否則查無資料
     */
    private Integer limit = 0;

    /**
     * 是否新的偏移循環,如未完成,應繼續子循環 limit
     *
     * true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 進行limit相加
     */
    private Boolean isNewStep = true;

    /**
     * 是否是手動指定的偏移,如果是說明是在手動被資料,偏移量将不會被更新
     *
     *      此變量是瞬時值,将不會被持久化到偏移辨別中
     */
    private transient Boolean isSpecifyOffset;

}      

3. 批量更新統計結果資料庫的實作 

  因每次統計的資料量是不确定的,因盡可能早的送出一次統計結果,防止一次送出太多,或者 機器故障時所有統計白費,是以需要分小事務進行。

@Service
public class StatisticsServiceImpl implements StatisticsService {
    /**
     * 批量更新統計分鐘級資料 (事務型送出)
     *
     * @param statisticsMinList 新統計資料
     * @return 影響行數
     */
    @Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)
    public Integer batchUpsertPremiumStatistics(List<BizProposalPolicyStatisticsMin> statisticsMinList,
            OffsetQueryTaskCallback callback) {
        AtomicInteger updateCount = new AtomicInteger(0);
        statisticsMinList.forEach(item -> {
            int affectNum = 0;
            BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);
            if (oldStatistics == null) {
                item.setEtlVersion(item.getEtlVersion() + ":0");
                affectNum = bizProposalPolicyStasticsMinMapper.insert(item);
            } else {
                oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());
                String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());
                oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());
                // todo: 優化更新版本号問題
                affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);
            }
            updateCount.addAndGet(affectNum);
        });
        callback.commit();
        return updateCount.get();
    }

    /**
     * 版本号過濾器(組裝版本資訊)
     *
     * @param oldVersion     老版本資訊
     * @param currentVersion 目前版本号
     * @return 可用的版本資訊
     */
    private String versionKeeperFilter(String oldVersion, String currentVersion) {
        String versionFull = oldVersion + "," + currentVersion;
        if (versionFull.length() >= 500) {
            // 從150以後,第一版本号開始保留
            versionFull = versionFull.substring(versionFull.indexOf(',', 150));
        }
        return versionFull;
    }

}      

4. 你需要一個啟動任務的地方

/**
 * 啟動時運作的任務排程服務
 *
 */
@Service
@Slf4j
public class TaskAutoRunScheduleService {

    @Resource
    private MinuteBizDataCounterTask minuteBizDataCounterTask;

    @PostConstruct
    public void bizDataAutoRun() {
        log.info("============= bizDataAutoRun start =================");
        ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));
        executorService.submit(minuteBizDataCounterTask);
    }

}      

5. 将每分鐘的資料從db查詢出來展示到頁面

  以上将資料統計後以分鐘級彙總到資料,接下來,監控頁面就隻需從db中進行簡單聚合就可以了,咱們就不費那精力去展示了。

6. 待完善的地方

  1. 叢集環境的任務運作将會出問題,解決辦法是:加一個分布式鎖即可。 你可以的!

  2. 針對重試執行統計問題,還得再考慮考慮。(幂等性)

唠叨: 踩坑不一定是壞事!

不要害怕今日的苦,你要相信明天,更苦!