天天看点

【深入浅出leveldb】LRU与哈希表【深入浅出leveldb】LRU与哈希表

【深入浅出leveldb】LRU与哈希表

1.LRUHandle

LRUHandle

内部存储了如下东西:

  • Key:value对
  • LRU链表
  • HashTable bucket的链表
  • 引用计数及清理
struct LRUHandle {
  // value
  void* value;
  // 当refs为0时,应该清理
  void (*deleter)(const Slice&, void* value);
  LRUHandle* next_hash; // HashTable hash冲突时指向 下一个LRUHandle
  LRUHandle* next;      // LRU链表下一个指针
  LRUHandle* prev;      // LRU链表前一个指针
  // 计算LRUCache容量
  size_t charge;  // TODO(opt): Only allow uint32_t?
  size_t key_length; // key长度
  bool in_cache;     // 是否在LRUCache in_user_链表
  uint32_t refs;     // 引用计数,用于删除数据
  uint32_t hash;     // key 对应的 hash值
  char key_data[1];  // 占位符,结构体末尾,通过key_length获取真正的key

  Slice key() const {
    assert(next != this);
    return Slice(key_data, key_length);
  }
};
           

复制

注意:当

refs

为0时,需要调用

deleter

清理函数,类似于

shared_ptr

2.HandleTable

HandleTable即哈希表,根据注释leveldb的哈希表实现要比g++要快很多。

下面来看基本结构,成员、构造、析构都比较好理解,这里使用了Resize函数,桶的大小初始化为4的倍数,例如:4、8、16等等。

Resize函数保证桶的个数大于元素个数,将旧桶数据拷贝到新桶当中。

class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
  ~HandleTable() { delete[] list_; }
 private:
  uint32_t length_;    // list_数组长度
  uint32_t elems_;     // handle数量
  LRUHandle** list_;   // 哈希桶
  // 初始化大小
  void Resize() {
    uint32_t new_length = 4;
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {
      LRUHandle* h = list_[i];
      while (h != nullptr) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_;
    list_ = new_list;
    length_ = new_length;
  }
};
           

复制

  • 查询操作

哈希表查询根据key与hash查询并返回LRUHandle*,在FindPointer函数中,首先根据hash值确定在哪个桶,然后在该桶中进行单链表遍历,查找到满足key与hash的节点。

// 哈希表查询
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
  return *FindPointer(key, hash);
}
// 根据key与hash查询 并返回LRUHandle*
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
  LRUHandle** ptr = &list_[hash & (length_ - 1)];
  while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
    ptr = &(*ptr)->next_hash;
  }
  return ptr;
}
           

复制

  • 插入操作

插入操作逻辑比较清晰:就是调用查询操作,查找到对应的key与hash旧节点,更新查找到的节点为传入的新节点(LRUHandle),如果旧节点不存在,也就是为空,表示在当前桶拉链末尾插入新节点(LRUHandle),并增加元素个数,如果超过已有的长度,进行Resize操作;否则,直接返回旧节点。

LRUHandle* Insert(LRUHandle* h) {
  // 查找不到对应的key 那么返回的ptr就是末尾的LRUHandle*
  LRUHandle** ptr = FindPointer(h->key(), h->hash);
  LRUHandle* old = *ptr;
  h->next_hash = (old == nullptr ? nullptr : old->next_hash);
  *ptr = h;
  if (old == nullptr) {
    ++elems_;
    if (elems_ > length_) {
      // Since each cache entry is fairly large, we aim for a small
      // average linked list length (<= 1).
      Resize();
    }
  }
  return old;
}
           

复制

  • 删除操作

根据传递的key与hash删除LRUHandle,首先确定要删除的节点,并返回,查找的节点如果不为空,则用链表的后一个节点替换当前节点,并减少元素个数,我想这里返回删除的节点,最终会释放内存。释放节点操作是在

LRUCache::Unref

中进行的。

LRUHandle* Remove(const Slice& key, uint32_t hash) {
  LRUHandle** ptr = FindPointer(key, hash);
  LRUHandle* result = *ptr;
  if (result != nullptr) {
    *ptr = result->next_hash;
    --elems_;
  }
  return result;
}
           

复制

3.LRUCache

哈希表讲解完毕后,便可以非常快的理解LRUCache。

在cache.h中有Cache抽象类。

class LEVELDB_EXPORT Cache {
};
           

复制

LRUCache基本结构如下:

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();
  // 设置容量
  void SetCapacity(size_t capacity) { capacity_ = capacity; }
 private:
  // 缓存容量
  size_t capacity_;
  // mutex_ protects the following state.
  mutable port::Mutex mutex_;
  // 缓存已经使用的容量
  size_t usage_ GUARDED_BY(mutex_);
  // 头节点 lru双向循环链表 prev是最先访问 next是最后访问
  // lru_保存 refs==1并且in_cache==true的handle 
  // lru_是最旧节点
  LRUHandle lru_ GUARDED_BY(mutex_);
  // 头节点 保存refs>=2并且 in_cache==true的节点
  LRUHandle in_use_ GUARDED_BY(mutex_);
  // 哈希表,用于快读查找
  HandleTable table_ GUARDED_BY(mutex_);
};
           

复制

来了解一下构造与析构:

  • 构造

容量与使用大小均设置为0。

创建空的双向循环链表。

LRUCache::LRUCache() : capacity_(0), usage_(0) {
  // Make empty circular linked lists.
  lru_.next = &lru_;
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}
           

复制

  • 析构

首先使用断言检查

in_use_

为空,即:所有节点已经从

in_use

插入到了

lru_

上,由于是循环链表所以判断条件是

e != &lru_

,如果最红回到了头节点,表示循环结束了。在释放节点时,设置节点不在缓存中(in_cache=false),此时引用计数也必须是1,才会正常释放。

LRUCache::~LRUCache() {
  assert(in_use_.next == &in_use_);  // Error if caller has an unreleased handle
  for (LRUHandle* e = lru_.next; e != &lru_;) {
    LRUHandle* next = e->next;
    assert(e->in_cache);
    e->in_cache = false;
    assert(e->refs == 1);  // Invariant of lru_ list.
    Unref(e);
    e = next;
  }
}
           

复制

继续看释放操作Unref函数:释放外部引用计数,根据当前Handle(节点)的引用计数进行判断

  • 是0,调用deleter删除Handle内容,并释放内存。
  • 在缓存中且引用计数是1,LRU_Remove删除当前节点,在

    lru_

    前面插入当前节点。
void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) {  // Deallocate.
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e);
  } else if (e->in_cache && e->refs == 1) {
    // No longer in use; move to lru_ list.
    LRU_Remove(e);
    LRU_Append(&lru_, e);  
  }
}
// 双向链表删除节点
void LRUCache::LRU_Remove(LRUHandle* e) {
  e->next->prev = e->prev;
  e->prev->next = e->next;
}

// 在list之前加入最新节点
void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
  // Make "e" newest entry by inserting just before *list
  e->next = list;
  e->prev = list->prev;
  e->prev->next = e;
  e->next->prev = e;
}
           

复制

LRUCache查询:

直接根据key与hash调用内部哈希表的查询函数。访问了某个节点,需要更新链表中该节点的位置,将其放在

lru_

前面,保证是最新访问的节点,并更新引用计数。

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle* e = table_.Lookup(key, hash);
  if (e != nullptr) {
    Ref(e);
  }
  return reinterpret_cast<Cache::Handle*>(e);
}

void LRUCache::Ref(LRUHandle* e) {
  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
    LRU_Remove(e);
    LRU_Append(&in_use_, e);
  }
  e->refs++;
}
           

复制

LRUCache删除:

删除哈希表中节点,前面提到过哈希表删除返回的是待删除节点,那在哪里释放内存呢?便是在这里!我们再来看一下FinishErase函数。可以看到通过LRU_Remove删除双向循环链表中目标节点,并通过Unref释放刚刚哈希表待删除节点的内存。

void LRUCache::Erase(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  FinishErase(table_.Remove(key, hash));
}
bool LRUCache::FinishErase(LRUHandle* e) {
  if (e != nullptr) {
    assert(e->in_cache);
    LRU_Remove(e);
    e->in_cache = false;
    usage_ -= e->charge;
    Unref(e);
  }
  return e != nullptr;
}
           

复制

LRUCache插入:

插入操作传递了一大堆参数,主要是用来构造LRUHandle,初始化时引用计数为1。

当插入时,当起节点便是最新的,那么需要更新引用计数。所以在后面看到引用计数加1不必奇怪。

如果缓存中的容量大于0,增加缓存引用计数、设置在缓存中、添加Handle到

in_user_

链表前面,增加已使用容量,释放旧节点。

如果缓存小于等于0,不进行操作。

如果缓存满了,那就清除掉最旧的节点。

ache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
                                size_t charge,
                                void (*deleter)(const Slice& key,
                                                void* value)) {
  MutexLock l(&mutex_);

  LRUHandle* e =
      reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->in_cache = false;
  e->refs = 1;  // for the returned handle.
  std::memcpy(e->key_data, key.data(), key.size());

  if (capacity_ > 0) {
    e->refs++;  // for the cache's reference.
    e->in_cache = true;   // 设置在缓存中
    LRU_Append(&in_use_, e);   // 将节点e插入到链表in_use_前面
    usage_ += charge;  // 增加已使用容量
    FinishErase(table_.Insert(e));  // 插入新节点到哈希表并释放旧节点
  } else {  // don't cache. (capacity_==0 is supported and turns off caching.)
    // next is read by key() in an assert, so it must be initialized
    e->next = nullptr;
  }
  // 缓存满了,移除掉最旧的元素
  while (usage_ > capacity_ && lru_.next != &lru_) {
    LRUHandle* old = lru_.next;
    assert(old->refs == 1);
    bool erased = FinishErase(table_.Remove(old->key(), old->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }

  return reinterpret_cast<Cache::Handle*>(e);
}
           

复制

其他操作:

1)更新缓存中的节点(handle)

void LRUCache::Release(Cache::Handle* handle) {
  MutexLock l(&mutex_);
  Unref(reinterpret_cast<LRUHandle*>(handle));
}
           

复制

  1. 删除

    lru_

    中所有引用计数为1的节点。
void LRUCache::Prune() {
  MutexLock l(&mutex_);
  while (lru_.next != &lru_) {
    LRUHandle* e = lru_.next;
    assert(e->refs == 1);
    bool erased = FinishErase(table_.Remove(e->key(), e->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }
}
           

复制

小结:

1)插入操作时,会把Handle插入到

in_use_

链表中,引用计数增加,使用完毕后需要手动通过Release函数进行释放(Unref),使得节点能够进入

lru_

或者释放内存。插入具体过程是:根据容量进行插入,如果容量充足,则插入

in_use_

中,容量刚好为0,表示不操作,使用容量已超过给定容量,则删除

lru_

中的节点。(

lur_

是最后被访问的节点,前面是最近被访问的节点,构成双向链表),删除时从后面依次删除。

2)查询操作时,会先从哈希表中查询,并返回查询的节点,判断该节点在

lru_

还是

in_use_

,如果在

lru

中,需要将其删除掉并更新到

in_use_

中,引用计数都会增加,使用完毕后需要手动通过Release函数进行释放(Unref),使得节点能够进入

lru_

或者释放内存。

3)删除操作时,会先从哈希表中删除,并返回待删除的节点,只要返回的节点不为空,说明节点删除成功,那么此时已经从哈希表中删除,此时直接根据双向链表性质,删除该节点,并设置不在缓存中,自动释放(Unref)引用与更新。

4.ShardedLRUCache

LRUCache接口都被加锁保护,为了减少锁持有时间,提高缓存命中率,通过ShardedLRUCache管理16个LRUCache。使用hash 的方式将一块cache 缓冲区 划分为多个小块的缓冲区。Shard函数用位运算方式拿到取模的结果,返回值总在0~kNumShards区间。

static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;//16,即16个分片,二进制定义的好处是取模时>>

class ShardedLRUCache : public Cache {
 private:
  // 每个LRUCache对象使用自己的锁
  LRUCache shard_[kNumShards];
  // 这个锁只保护对应id
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }
 
  // 0-kNumShards 刚好取模结果
  static uint32_t Shard(uint32_t hash) { return hash >> (32 - kNumShardBits); }

 public:
  explicit ShardedLRUCache(size_t capacity) : last_id_(0) {
    // 向上取整,为每个LRUCache 分配容量
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].SetCapacity(per_shard);
    }
  }
  ~ShardedLRUCache() override {}
};

           

复制

其余核心函数,比较好理解,根据key,调用HashSlice生成hash值,直接调用前面的LRUCache方法即可。

Handle* Insert(const Slice& key, void* value, size_t charge,
               void (*deleter)(const Slice& key, void* value)) override {
  const uint32_t hash = HashSlice(key);
  return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
Handle* Lookup(const Slice& key) override {
  const uint32_t hash = HashSlice(key);
  return shard_[Shard(hash)].Lookup(key, hash);
}
void Release(Handle* handle) override {
  LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
  shard_[Shard(h->hash)].Release(handle);
}
void Erase(const Slice& key) override {
  const uint32_t hash = HashSlice(key);
  shard_[Shard(hash)].Erase(key, hash);
}
void* Value(Handle* handle) override {
  return reinterpret_cast<LRUHandle*>(handle)->value;
}
uint64_t NewId() override {
  MutexLock l(&id_mutex_);
  return ++(last_id_);
}
void Prune() override {
  for (int s = 0; s < kNumShards; s++) {
    shard_[s].Prune();
  }
}
size_t TotalCharge() const override {
  size_t total = 0;
  for (int s = 0; s < kNumShards; s++) {
    total += shard_[s].TotalCharge();
  }
  return total;
}
           

复制

最后便是创建LRUCache。

LEVELDB_EXPORT Cache* NewLRUCache(size_t capacity);
Cache* NewLRUCache(size_t capacity) { return new ShardedLRUCache(capacity); }
           

复制

5.使用

最后给出使用方法,简单的测试一下。

首先插入key:value对,有三对,分别是:hello:100,world:101,test:201。

插入之后所有元素均在

in_use_

链表上,调用Release释放引用,此时都在

lru_

链表上,当我们查看test时,test被放在了

in_use_

链表上,引用计数+1,需要再次调用Release释放引用,此时都在

lru_

链表上,最终调用delete,此时会触发deleter自定义函数。

#include "leveldb/cache.h"

#include <iostream>
#include <vector>

void deleter(const leveldb::Slice& key, void* value) {
  std::cout << "deleter key:" << key.ToString()
            << " value:" << (*static_cast<int*>(value)) << std::endl;
}
int main() {
  leveldb::Cache* cache = leveldb::NewLRUCache(8);

  std::vector<std::string> orignal_keys{"hello", "world", "test"};
  std::vector<int> orignal_values{100, 101, 201};
  std::vector<leveldb::Cache::Handle*> handles;

  for (size_t i = 0; i < orignal_keys.size(); ++i) {
    handles.push_back(cache->Insert(
        orignal_keys[i], static_cast<void*>(&orignal_values[i]), 1, deleter));
    std::cout << "Insert key:" << orignal_keys[i]
              << " value:" << *static_cast<int*>(cache->Value(handles[i]))
              << std::endl;
    ;
  }

  for (size_t i = 0; i < handles.size(); ++i) {
    cache->Release(handles[i]);
  }

  leveldb::Cache::Handle* handle = cache->Lookup("test");
  std::cout << "Lookup key:test value:" << *static_cast<int*>(cache->Value(handle))
            << std::endl;
  cache->Release(handle);
  delete cache;

  return 0;
}
           

复制

注:编译源码的代码如下:

g++-11 cache_test.cpp -lleveldb -lpthread -I/leveldb/include -L/leveldb/leveldb/build -std=c++11
           

复制

记得更换自己的目录。

本节完~