天天看點

【深入淺出leveldb】LRU與哈希表【深入淺出leveldb】LRU與哈希表

【深入淺出leveldb】LRU與哈希表

1.LRUHandle

LRUHandle

内部存儲了如下東西:

  • Key:value對
  • LRU連結清單
  • HashTable bucket的連結清單
  • 引用計數及清理
struct LRUHandle {
  // value
  void* value;
  // 當refs為0時,應該清理
  void (*deleter)(const Slice&, void* value);
  LRUHandle* next_hash; // HashTable hash沖突時指向 下一個LRUHandle
  LRUHandle* next;      // LRU連結清單下一個指針
  LRUHandle* prev;      // LRU連結清單前一個指針
  // 計算LRUCache容量
  size_t charge;  // TODO(opt): Only allow uint32_t?
  size_t key_length; // key長度
  bool in_cache;     // 是否在LRUCache in_user_連結清單
  uint32_t refs;     // 引用計數,用于删除資料
  uint32_t hash;     // key 對應的 hash值
  char key_data[1];  // 占位符,結構體末尾,通過key_length擷取真正的key

  Slice key() const {
    assert(next != this);
    return Slice(key_data, key_length);
  }
};
           

複制

注意:當

refs

為0時,需要調用

deleter

清理函數,類似于

shared_ptr

2.HandleTable

HandleTable即哈希表,根據注釋leveldb的哈希表實作要比g++要快很多。

下面來看基本結構,成員、構造、析構都比較好了解,這裡使用了Resize函數,桶的大小初始化為4的倍數,例如:4、8、16等等。

Resize函數保證桶的個數大于元素個數,将舊桶資料拷貝到新桶當中。

class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
  ~HandleTable() { delete[] list_; }
 private:
  uint32_t length_;    // list_數組長度
  uint32_t elems_;     // handle數量
  LRUHandle** list_;   // 哈希桶
  // 初始化大小
  void Resize() {
    uint32_t new_length = 4;
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {
      LRUHandle* h = list_[i];
      while (h != nullptr) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_;
    list_ = new_list;
    length_ = new_length;
  }
};
           

複制

  • 查詢操作

哈希表查詢根據key與hash查詢并傳回LRUHandle*,在FindPointer函數中,首先根據hash值确定在哪個桶,然後在該桶中進行單連結清單周遊,查找到滿足key與hash的節點。

// 哈希表查詢
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
  return *FindPointer(key, hash);
}
// 根據key與hash查詢 并傳回LRUHandle*
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
  LRUHandle** ptr = &list_[hash & (length_ - 1)];
  while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
    ptr = &(*ptr)->next_hash;
  }
  return ptr;
}
           

複制

  • 插入操作

插入操作邏輯比較清晰:就是調用查詢操作,查找到對應的key與hash舊節點,更新查找到的節點為傳入的新節點(LRUHandle),如果舊節點不存在,也就是為空,表示在目前桶拉鍊末尾插入新節點(LRUHandle),并增加元素個數,如果超過已有的長度,進行Resize操作;否則,直接傳回舊節點。

LRUHandle* Insert(LRUHandle* h) {
  // 查找不到對應的key 那麼傳回的ptr就是末尾的LRUHandle*
  LRUHandle** ptr = FindPointer(h->key(), h->hash);
  LRUHandle* old = *ptr;
  h->next_hash = (old == nullptr ? nullptr : old->next_hash);
  *ptr = h;
  if (old == nullptr) {
    ++elems_;
    if (elems_ > length_) {
      // Since each cache entry is fairly large, we aim for a small
      // average linked list length (<= 1).
      Resize();
    }
  }
  return old;
}
           

複制

  • 删除操作

根據傳遞的key與hash删除LRUHandle,首先确定要删除的節點,并傳回,查找的節點如果不為空,則用連結清單的後一個節點替換目前節點,并減少元素個數,我想這裡傳回删除的節點,最終會釋放記憶體。釋放節點操作是在

LRUCache::Unref

中進行的。

LRUHandle* Remove(const Slice& key, uint32_t hash) {
  LRUHandle** ptr = FindPointer(key, hash);
  LRUHandle* result = *ptr;
  if (result != nullptr) {
    *ptr = result->next_hash;
    --elems_;
  }
  return result;
}
           

複制

3.LRUCache

哈希表講解完畢後,便可以非常快的了解LRUCache。

在cache.h中有Cache抽象類。

class LEVELDB_EXPORT Cache {
};
           

複制

LRUCache基本結構如下:

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();
  // 設定容量
  void SetCapacity(size_t capacity) { capacity_ = capacity; }
 private:
  // 緩存容量
  size_t capacity_;
  // mutex_ protects the following state.
  mutable port::Mutex mutex_;
  // 緩存已經使用的容量
  size_t usage_ GUARDED_BY(mutex_);
  // 頭節點 lru雙向循環連結清單 prev是最先通路 next是最後通路
  // lru_儲存 refs==1并且in_cache==true的handle 
  // lru_是最舊節點
  LRUHandle lru_ GUARDED_BY(mutex_);
  // 頭節點 儲存refs>=2并且 in_cache==true的節點
  LRUHandle in_use_ GUARDED_BY(mutex_);
  // 哈希表,用于快讀查找
  HandleTable table_ GUARDED_BY(mutex_);
};
           

複制

來了解一下構造與析構:

  • 構造

容量與使用大小均設定為0。

建立空的雙向循環連結清單。

LRUCache::LRUCache() : capacity_(0), usage_(0) {
  // Make empty circular linked lists.
  lru_.next = &lru_;
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}
           

複制

  • 析構

首先使用斷言檢查

in_use_

為空,即:所有節點已經從

in_use

插入到了

lru_

上,由于是循環連結清單是以判斷條件是

e != &lru_

,如果最紅回到了頭節點,表示循環結束了。在釋放節點時,設定節點不在緩存中(in_cache=false),此時引用計數也必須是1,才會正常釋放。

LRUCache::~LRUCache() {
  assert(in_use_.next == &in_use_);  // Error if caller has an unreleased handle
  for (LRUHandle* e = lru_.next; e != &lru_;) {
    LRUHandle* next = e->next;
    assert(e->in_cache);
    e->in_cache = false;
    assert(e->refs == 1);  // Invariant of lru_ list.
    Unref(e);
    e = next;
  }
}
           

複制

繼續看釋放操作Unref函數:釋放外部引用計數,根據目前Handle(節點)的引用計數進行判斷

  • 是0,調用deleter删除Handle内容,并釋放記憶體。
  • 在緩存中且引用計數是1,LRU_Remove删除目前節點,在

    lru_

    前面插入目前節點。
void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) {  // Deallocate.
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e);
  } else if (e->in_cache && e->refs == 1) {
    // No longer in use; move to lru_ list.
    LRU_Remove(e);
    LRU_Append(&lru_, e);  
  }
}
// 雙向連結清單删除節點
void LRUCache::LRU_Remove(LRUHandle* e) {
  e->next->prev = e->prev;
  e->prev->next = e->next;
}

// 在list之前加入最新節點
void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
  // Make "e" newest entry by inserting just before *list
  e->next = list;
  e->prev = list->prev;
  e->prev->next = e;
  e->next->prev = e;
}
           

複制

LRUCache查詢:

直接根據key與hash調用内部哈希表的查詢函數。通路了某個節點,需要更新連結清單中該節點的位置,将其放在

lru_

前面,保證是最新通路的節點,并更新引用計數。

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle* e = table_.Lookup(key, hash);
  if (e != nullptr) {
    Ref(e);
  }
  return reinterpret_cast<Cache::Handle*>(e);
}

void LRUCache::Ref(LRUHandle* e) {
  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
    LRU_Remove(e);
    LRU_Append(&in_use_, e);
  }
  e->refs++;
}
           

複制

LRUCache删除:

删除哈希表中節點,前面提到過哈希表删除傳回的是待删除節點,那在哪裡釋放記憶體呢?便是在這裡!我們再來看一下FinishErase函數。可以看到通過LRU_Remove删除雙向循環連結清單中目标節點,并通過Unref釋放剛剛哈希表待删除節點的記憶體。

void LRUCache::Erase(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  FinishErase(table_.Remove(key, hash));
}
bool LRUCache::FinishErase(LRUHandle* e) {
  if (e != nullptr) {
    assert(e->in_cache);
    LRU_Remove(e);
    e->in_cache = false;
    usage_ -= e->charge;
    Unref(e);
  }
  return e != nullptr;
}
           

複制

LRUCache插入:

插入操作傳遞了一大堆參數,主要是用來構造LRUHandle,初始化時引用計數為1。

當插入時,當起節點便是最新的,那麼需要更新引用計數。是以在後面看到引用計數加1不必奇怪。

如果緩存中的容量大于0,增加緩存引用計數、設定在緩存中、添加Handle到

in_user_

連結清單前面,增加已使用容量,釋放舊節點。

如果緩存小于等于0,不進行操作。

如果緩存滿了,那就清除掉最舊的節點。

ache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
                                size_t charge,
                                void (*deleter)(const Slice& key,
                                                void* value)) {
  MutexLock l(&mutex_);

  LRUHandle* e =
      reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->in_cache = false;
  e->refs = 1;  // for the returned handle.
  std::memcpy(e->key_data, key.data(), key.size());

  if (capacity_ > 0) {
    e->refs++;  // for the cache's reference.
    e->in_cache = true;   // 設定在緩存中
    LRU_Append(&in_use_, e);   // 将節點e插入到連結清單in_use_前面
    usage_ += charge;  // 增加已使用容量
    FinishErase(table_.Insert(e));  // 插入新節點到哈希表并釋放舊節點
  } else {  // don't cache. (capacity_==0 is supported and turns off caching.)
    // next is read by key() in an assert, so it must be initialized
    e->next = nullptr;
  }
  // 緩存滿了,移除掉最舊的元素
  while (usage_ > capacity_ && lru_.next != &lru_) {
    LRUHandle* old = lru_.next;
    assert(old->refs == 1);
    bool erased = FinishErase(table_.Remove(old->key(), old->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }

  return reinterpret_cast<Cache::Handle*>(e);
}
           

複制

其他操作:

1)更新緩存中的節點(handle)

void LRUCache::Release(Cache::Handle* handle) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast<LRUHandle*>(handle));
}
           

複制

  1. 删除

    lru_

    中所有引用計數為1的節點。
void LRUCache::Prune() {
  MutexLock l(&mutex_);
  while (lru_.next != &lru_) {
    LRUHandle* e = lru_.next;
    assert(e->refs == 1);
    bool erased = FinishErase(table_.Remove(e->key(), e->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }
}
           

複制

小結:

1)插入操作時,會把Handle插入到

in_use_

連結清單中,引用計數增加,使用完畢後需要手動通過Release函數進行釋放(Unref),使得節點能夠進入

lru_

或者釋放記憶體。插入具體過程是:根據容量進行插入,如果容量充足,則插入

in_use_

中,容量剛好為0,表示不操作,使用容量已超過給定容量,則删除

lru_

中的節點。(

lur_

是最後被通路的節點,前面是最近被通路的節點,構成雙向連結清單),删除時從後面依次删除。

2)查詢操作時,會先從哈希表中查詢,并傳回查詢的節點,判斷該節點在

lru_

還是

in_use_

,如果在

lru

中,需要将其删除掉并更新到

in_use_

中,引用計數都會增加,使用完畢後需要手動通過Release函數進行釋放(Unref),使得節點能夠進入

lru_

或者釋放記憶體。

3)删除操作時,會先從哈希表中删除,并傳回待删除的節點,隻要傳回的節點不為空,說明節點删除成功,那麼此時已經從哈希表中删除,此時直接根據雙向連結清單性質,删除該節點,并設定不在緩存中,自動釋放(Unref)引用與更新。

4.ShardedLRUCache

LRUCache接口都被加鎖保護,為了減少鎖持有時間,提高緩存命中率,通過ShardedLRUCache管理16個LRUCache。使用hash 的方式将一塊cache 緩沖區 劃分為多個小塊的緩沖區。Shard函數用位運算方式拿到取模的結果,傳回值總在0~kNumShards區間。

static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;//16,即16個分片,二進制定義的好處是取模時>>

class ShardedLRUCache : public Cache {
 private:
  // 每個LRUCache對象使用自己的鎖
  LRUCache shard_[kNumShards];
  // 這個鎖隻保護對應id
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }
 
  // 0-kNumShards 剛好取模結果
  static uint32_t Shard(uint32_t hash) { return hash >> (32 - kNumShardBits); }

 public:
  explicit ShardedLRUCache(size_t capacity) : last_id_(0) {
    // 向上取整,為每個LRUCache 配置設定容量
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].SetCapacity(per_shard);
    }
  }
  ~ShardedLRUCache() override {}
};

           

複制

其餘核心函數,比較好了解,根據key,調用HashSlice生成hash值,直接調用前面的LRUCache方法即可。

Handle* Insert(const Slice& key, void* value, size_t charge,
               void (*deleter)(const Slice& key, void* value)) override {
  const uint32_t hash = HashSlice(key);
  return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
Handle* Lookup(const Slice& key) override {
  const uint32_t hash = HashSlice(key);
  return shard_[Shard(hash)].Lookup(key, hash);
}
void Release(Handle* handle) override {
  LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
  shard_[Shard(h->hash)].Release(handle);
}
void Erase(const Slice& key) override {
  const uint32_t hash = HashSlice(key);
  shard_[Shard(hash)].Erase(key, hash);
}
void* Value(Handle* handle) override {
  return reinterpret_cast<LRUHandle*>(handle)->value;
}
uint64_t NewId() override {
  MutexLock l(&id_mutex_);
  return ++(last_id_);
}
void Prune() override {
  for (int s = 0; s < kNumShards; s++) {
    shard_[s].Prune();
  }
}
size_t TotalCharge() const override {
  size_t total = 0;
  for (int s = 0; s < kNumShards; s++) {
    total += shard_[s].TotalCharge();
  }
  return total;
}
           

複制

最後便是建立LRUCache。

LEVELDB_EXPORT Cache* NewLRUCache(size_t capacity);
Cache* NewLRUCache(size_t capacity) { return new ShardedLRUCache(capacity); }
           

複制

5.使用

最後給出使用方法,簡單的測試一下。

首先插入key:value對,有三對,分别是:hello:100,world:101,test:201。

插入之後所有元素均在

in_use_

連結清單上,調用Release釋放引用,此時都在

lru_

連結清單上,當我們檢視test時,test被放在了

in_use_

連結清單上,引用計數+1,需要再次調用Release釋放引用,此時都在

lru_

連結清單上,最終調用delete,此時會觸發deleter自定義函數。

#include "leveldb/cache.h"

#include <iostream>
#include <vector>

void deleter(const leveldb::Slice& key, void* value) {
  std::cout << "deleter key:" << key.ToString()
            << " value:" << (*static_cast<int*>(value)) << std::endl;
}
int main() {
  leveldb::Cache* cache = leveldb::NewLRUCache(8);

  std::vector<std::string> orignal_keys{"hello", "world", "test"};
  std::vector<int> orignal_values{100, 101, 201};
  std::vector<leveldb::Cache::Handle*> handles;

  for (size_t i = 0; i < orignal_keys.size(); ++i) {
    handles.push_back(cache->Insert(
        orignal_keys[i], static_cast<void*>(&orignal_values[i]), 1, deleter));
    std::cout << "Insert key:" << orignal_keys[i]
              << " value:" << *static_cast<int*>(cache->Value(handles[i]))
              << std::endl;
    ;
  }

  for (size_t i = 0; i < handles.size(); ++i) {
    cache->Release(handles[i]);
  }

  leveldb::Cache::Handle* handle = cache->Lookup("test");
  std::cout << "Lookup key:test value:" << *static_cast<int*>(cache->Value(handle))
            << std::endl;
  cache->Release(handle);
  delete cache;

  return 0;
}
           

複制

注:編譯源碼的代碼如下:

g++-11 cache_test.cpp -lleveldb -lpthread -I/leveldb/include -L/leveldb/leveldb/build -std=c++11
           

複制

記得更換自己的目錄。

本節完~