天天看點

Redis 漫談 —— 分布式布隆過濾及記憶體使用問題分析

背景介紹

公司消息推送系統每日會對旗下百萬級體量 app 使用者進行消息推送;同時,為了防止過度打擾使用者,隻允許一天之中對同一個使用者裝置推送最多 4 條營運類消息 —— 我們選擇使用基于 redis 的「分布式布隆過濾」政策實作該限制,具體工具為 github 開源項目 Orestes-Bloomfilter。

布隆過濾

Redis 漫談 —— 分布式布隆過濾及記憶體使用問題分析

參考上圖,布隆過濾器的資料結構為元素初始值均為 0 的「比特數組」;當向布隆過濾器「添加」某個元素(如上圖 x、y 或 z)時,需要對該元素進行多次不同算法類型的哈希運算,将運算結果根據布隆過濾器長度取餘,得到一批對應的數組位置,并将數組中這些位置的值置為 1;當「校驗」某個元素是否已添加至布隆過濾器時,我們進行與「添加」元素時相同的計算得到一批數組位置,然後檢查數組中這些對應位置上的值是否為 1 —— 若「均為」 1,可以認為被校驗的元素已經存在于布隆過濾器中,否則表示被校驗的元素尚不存在。

可以看到布隆過濾器是一種非常緊湊的資料格式;雖然存在一定「誤判率」,但是相對于傳統的将元素直接存放在集合中用于校驗的方式,布隆過濾是一種很好的「以微弱的時間代價換空間」的做法。本文不對該算法做深入詳盡的介紹,感興趣的讀者可以查閱相關資料。

工程實踐

google 的 guava 包中提供了基于記憶體的布隆過濾器,但是對于分布式應用而言,存在單點問題和過濾狀态維護的不便;于是我們對基于 redis 的分布式過濾進行調研,比較了 ReBloom 和 Orestes-Bloomfilter 兩款開源工具;前者需要以子產品(module)的方式內建至 redis 中;後者直接使用 setbit 指令與 redis 互動進行布隆過濾器的指派,并且對并發批量操作做了較好的封裝,于是我們優先對後者進行了測試:記憶體占用方面,一個預期以百萬分之一的誤判率對 200 萬元素進行過濾或者說校驗的布隆過濾器約占 7 mb 空間;過濾性能方面,4 個線程并發對 25 萬元素(即總量 100 萬元素)進行批量校驗總耗時約 45 秒 —— 綜合評估在可接受的範圍,于是我們将這個方案落地實施:

public void containAndPut(final List<String> keys) {
    // bloomFilterList 包含 4 個布隆過濾器用戶端
    bloomFilterList.forEach(bf -> {
        // 嘗試批量添加目标裝置資訊
        List<Boolean> results = bf.addAll(keys);
        for (int i = results.size() - 1; i >= 0; --i)
            // 将成功添加的裝置元素從清單中移除
            if (results.get(i))
                keys.remove(i);
    });
}           

上述代碼為核心邏輯:由于在「背景介紹」中提到的每天 4 條營運類消息推送的限制,我們需要周遊一組共 4 個布隆過濾器,調用 addAll 方法嘗試批量添加目标裝置 keys,并在每次操作後将被成功添加的裝置元素從清單中移除,最後剩下的便是已經達到發送次數上限的裝置清單。

奇怪現象

該方案在生産環境運作穩定,但是在監測觀察中我們也發現了一個在測試時疏忽了的奇怪現象 —— 在進行大量資料的并發過濾校驗時,redis 的記憶體占用(used_memory_rss)會一度飙升,而後快速回落至預期水準:

Redis 漫談 —— 分布式布隆過濾及記憶體使用問題分析

排除了 bgsave 因素後我們猜測是 redis 在進行布隆過濾操作時占用了大量記憶體;于是我們仔細分析了 Orestes-Bloomfilter 中相關代碼邏輯:

public List<Boolean> addAll(Collection<T> elements) {
    List<Boolean> added = new ArrayList<>();
    // 使用事務模式
    List<Boolean> results = pool.transactionallyDo(p -> {
        //  周遊所有帶校驗元素
        for (T value : elements) {
            // 進行多次哈希計算
            for (int position : hash(toBytes(value))) {
                // 将 redis 「布隆過濾器」對應位置的值置為 1
                bloom.set(p, position, true);
            }
        }
    });

    // 判斷各個元素是否「初次」添加至布隆過濾器
    boolean wasAdded = false;
    int numProcessed = 0;
    for (Boolean item : results) {
        // 若 item 為 0,表示該位置元素先前非 1,判斷為初次添加
        if (!item) wasAdded = true;
        // config().hashes() 傳回整數 n,表示對某元素進行了 n 次不同的哈希計算
        if ((numProcessed + 1) % config().hashes() == 0) {
            added.add(wasAdded);
            wasAdded = false;
        }
        numProcessed++;
    }
    return added;
}           

在消息推送時我們使用上述方法對使用者裝置進行批量校驗;基本思路是對使用者裝置資訊進行多次不同的哈希計算,并嘗試将 redis 中比特數組對應位置的值置為 1,隻需其中任一位置操作成功便可判定該裝置資訊先前不存在,否則表示該使用者裝置已存在布隆過濾器中;程式與 redis 進行互動時使用的具體指令是

SETBIT key offset value

,用于對 key 對應的字元串中 offset 位置指派 value(0或1),并傳回該位置先前的值。

繼續分析 transactionallyDo 的方法邏輯:

public <T> List<T> transactionallyDo(Consumer<Pipeline> f, String... watch) {
    return (List<T>) safelyReturn(jedis -> {
        Pipeline p = jedis.pipelined();
        if (watch.length != 0) {
            p.watch(watch);
        }
        // 标記 redis 事務開始
        p.multi();
        // 執行調用方(此處為 addAll 方法)指定操作
        f.accept(p);
        // 執行 redis 事務
        Response<List<Object>> exec = p.exec();
        // 擷取操作結果
        p.sync();
        return exec.get();
    });
}           

可以看到代碼中為了保證對大量元素進行布隆過濾操作的性能,與 redis 互動時使用了「管道」 —— 即在單次互動中批量發送操作指令以減少網絡 I/O;同時,為了保證并發安全性,每一批次的管道指令操作都在一個「事務」中進行。查閱相關文檔,可以發現無論是管道模式還是事務模式都需要消耗 redis 記憶體:

While the client sends commands using pipelining, the server will be forced to queue the replies, using memory.

To perform a transaction in Redis, we first call MULTI, followed by any sequence of commands we intend to execute, followed by EXEC. When seeing MULTI, Redis will queue up commands from that same connection until it sees an EXEC, at which point Redis will execute the queued commands sequentially without interruption.

在管道模式下,redis 在回應用戶端請求前需要緩存所有操作指令對應的結果;在事務模式下,redis 在真正執行該事務前需要緩存相應的操作指令;與此同時,我們的消息推送應用以并發的方式進行分布式布隆過濾 —— 綜合考慮這些因素就可以了解為何 redis 程序會出現短暫的記憶體占用高峰了。

問題解決

雖然在記憶體占用達到極值(maxmemory)後,redis 仍可以繼續使用記憶體進行操作指令緩存等非資料存儲工作,但是出于穩定性考慮我們可能希望對這種情況做一定的限制;由于查明了記憶體快速上升的具體原因,相應的控制措施也就比較明了了 —— 可以降低進行布隆過濾的并發數或減少了管道模式下批量操作指令數來到達效果。

參考資料

繼續閱讀