天天看點

LevelDB源碼分析之十三:table

一.Table的邏輯結構

Table也叫SSTable(Sorted String Table),是資料在.sst檔案中的存儲形式。Table的邏輯結構如下所示,包括存儲資料的Block,存儲索引資訊的Block,存儲Filter的Block:

LevelDB源碼分析之十三:table

Footer:為于Table尾部,記錄指向Metaindex Block的Handle和指向Index Block的Handle。需要說明的是Table中所有的Handle是通過偏移量Offset以及Size一同來表示的,用來指明所指向的Block位置。Footer是SST檔案解析開始的地方,通過Footer中記錄的這兩個關鍵元資訊Block的位置,可以友善的開啟之後的解析工作。另外Footer種還記錄了用于驗證檔案是否為合法SST檔案的常數值Magicnum。

Index Block:記錄Data Block位置資訊的Block,其中的每一條Entry指向一個Data Block,其Key值為所指向的Data Block最後一條資料的Key,Value為指向該Data Block位置的Handle。

Metaindex Block:與Index Block類似,由一組Handle組成,不同的是這裡的Handle指向的Meta Block。

Data Block:以Key-Value的方式存儲實際資料,其中Key定義為:

DataBlock Key := UserKey + SequenceNum + Type
Type := kDelete or kValue
           

對比Memtable中的Key,可以發現Data Block中的Key并沒有拼接UserKey的長度在UserKey前,這是由于上面講到的實體結構中已經有了Key的長度資訊。

Meta Block:比較特殊的Block,用來存儲元資訊,目前LevelDB使用的僅有對布隆過濾器的存儲。寫入Data Block的資料會同時更新對應Meta Block中的過濾器。讀取資料時也會首先經過布隆過濾器(Bloom Filter)過濾,我看的源碼還未用到Bloom Filter,可參考: BloomFilter——大規模資料處理利器。Meta Block的實體結構也與其他Block有所不同:

[filter 0]
 [filter 1] 
 [filter 2] 
 ... 
 [filter N-1] 
 [offset of filter 0] : 4 bytes 
 [offset of filter 1] : 4 bytes 
 [offset of filter 2] : 4 bytes 
 ... 
 [offset of filter N-1] : 4 bytes 
 [offset of beginning of offset array] : 4 bytes 
 lg(base) : 1 byte
           

其中每個filter節對應一段Key Range,落在某個Key Range的Key需要到對應的filter節中查找自己的過濾資訊,base指定這個Range的大小。

關于Block的結構詳見:LevelDB源碼分析之十二:block

與Block類似,Table的管理也是讀寫分離的,讀取後的周遊查詢操作由table類實作,建構則由TableBuilder類實作。

二.Table的建構

leveldb通過TableBuilder類來建構每一個.sst檔案,TableBuilder類的成員變量隻有一個結構體Rep* rep_,Rep的結構為:

struct TableBuilder::Rep {
  Options options;
  Options index_block_options;
  WritableFile* file;//要生成的.sst檔案 
  uint64_t offset;//累加每個Data Block的偏移量
  Status status;
  BlockBuilder data_block;//存儲KV對的資料塊
  BlockBuilder index_block;//資料塊對應的索引塊
  std::string last_key;//上一個插入的key值,新插入的key必須比它大,保證.sst檔案中的key是從小到大排列的
  int64_t num_entries;//.sst檔案中存儲的所有記錄總數。關于記錄可以參考LevelDB源碼分析之十二:block
  bool closed;// 調用Finish()或Abandon()時,closed=true,表示Table建構結束。

  bool pending_index_entry;//當一個Data Block被寫入到.sst檔案時,為true
  BlockHandle pending_handle;  //BlockHandle隻有offset_和size_兩個變量,用來記錄每個Data Block在.sst檔案中的偏移量和大小

  std::string compressed_output;//Data Block的block_data字段壓縮後的結果

  Rep(const Options& opt, WritableFile* f)
      : options(opt),
        index_block_options(opt),
        file(f),
        offset(0),
        data_block(&options),
        index_block(&index_block_options),
        num_entries(0),
        closed(false),
        pending_index_entry(false) {
    index_block_options.block_restart_interval = 1;//Index Block的block_data字段中重新開機點的間隔
  }
};
           

可以看到Rep中不僅接管了各種Block的生成細節,而且還會記錄生成Block需要的一些統計資訊。是以我們可以認為,TableBuilder隻不過是對Block的一層淺封裝,真正做事情的是Rep。而TableBuilder中的Add函數本質上不過是對Rep中BlockBuilder的Add函數的調用。

1.Add函數

通過Add函數向.sst檔案中寫入一個Data Block。

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }
  // 當一個Data Block被寫入到磁盤時,為true
  if (r->pending_index_entry) {
    // 說明到了新的一個Data Block
    assert(r->data_block.empty());
    // 考慮這兩個key"the quick brown fox"和"the who", 進FindShortestSeparator
    // 處理後,r->last_key=the r。這樣的話r->last_key就大于上一個Data Block的
    // 所有key,并且小于後面所有Data Block的key。
    r->options.comparator->FindShortestSeparator(&r->last_key, key);
    // 将上一個Data Block的偏移和大小編碼後作為Value存放到index_block中
    std::string handle_encoding;
    r->pending_handle.EncodeTo(&handle_encoding);
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r->pending_index_entry = false;
  }

  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, value);
  
  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  // 如果Data Block的block_data字段大小滿足要求,準備寫入到磁盤
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}
           

2.Flush函數

當一個Data Block大小超過設定值(預設為4K)時,執行Flush()操作。

void TableBuilder::Flush() {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->data_block.empty()) return;
  assert(!r->pending_index_entry);
  WriteBlock(&r->data_block, &r->pending_handle);
  if (ok()) {
    r->pending_index_entry = true;
    // 将Data Block實時寫入到磁盤,防止緩存中的file過大
    r->status = r->file->Flush();
  }
}
           

Flush函數先調用WriteBlock向檔案添加資料,然後執行file的Flush()函數将檔案寫入磁盤。

3.WriteBlock

void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  // File format contains a sequence of blocks where each block has:
  //    block_data: uint8[n]
  //    type: uint8
  //    crc: uint32
  assert(ok());
  Rep* r = rep_;
  // 傳回完整的block_data字段
  Slice raw = block->Finish();

  Slice block_contents;
  CompressionType type = r->options.compression;
  // TODO(postrelease): Support more compression options: zlib?
  switch (type) {
    case kNoCompression:
      block_contents = raw;
      break;
    // 采用Snappy壓縮,Snappy是谷歌開源的壓縮庫
    case kSnappyCompression: {
      std::string* compressed = &r->compressed_output;
      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
          compressed->size() < raw.size() - (raw.size() / 8u)) {
        block_contents = *compressed;
      } else {
        // Snappy not supported, or compressed less than 12.5%, so just
        // store uncompressed form
	// 如果不支援Snappy壓縮,或者壓縮比小于12.5%,那就使用原始資料
        block_contents = raw;
        type = kNoCompression;
      }
      break;
    }
  }
  // 設定Data Block的偏移和該Data Block的block_data字段的大小
  // 第一個Data Block的偏移為0
  handle->set_offset(r->offset);
  handle->set_size(block_contents.size());
  r->status = r->file->Append(block_contents);
  if (r->status.ok()) {
    char trailer[kBlockTrailerSize];
    trailer[0] = type;
     // 為block_contents添加校驗
    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
     // 為type也添加校驗
    crc = crc32c::Extend(crc, trailer, 1); 
    // 将校驗碼拷貝到trailer的後四個位元組
    EncodeFixed32(trailer+1, crc32c::Mask(crc));
    // 向檔案尾部添加壓縮類型和校驗碼,這樣一個完整的Block Data誕生
    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    if (r->status.ok()) {
      // 偏移應該包括壓縮類型和校驗碼的大小
      r->offset += block_contents.size() + kBlockTrailerSize;
    }
  }
  r->compressed_output.clear();
  // 重置block
  block->Reset();
}
           

WriteBlock函數實際上就是把block_data進行Snappy壓縮(如果支援),然後包裝成完整的Block Data,并記錄一些統計資訊。我用的Windows版LevelDB,代碼比較老,預設不支援Snappy壓縮。

4.Finish函數

Status TableBuilder::Finish() {
  Rep* r = rep_;
  // 為何要調用一次Flush,是因為調用Finish的時候,
  // block_data不一定大于等于block_size,是以要調用Flush
  // 将這部分block_data寫入到磁盤
  Flush();
  assert(!r->closed);
  r->closed = true;
  BlockHandle metaindex_block_handle;
  BlockHandle index_block_handle;
  if (ok()) {
    // 我看的源碼不支援Meta Block,這裡的meta_index_block也沒有實際作用
    BlockBuilder meta_index_block(&r->options);
    // metaindex_block_handle記錄了Meta Index Block的偏移和大小
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }
  if (ok()) {
    // 最後一個Data_Block,無法進行r->last_key和key的比較,
    // 是以隻能調用FindShortSuccessor,直接取一個比r->last_key大的key
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    // index_block_handle記錄了Index Block的偏移和大小
    WriteBlock(&r->index_block, &index_block_handle);
  }
  if (ok()) {
    // 組建Footer,并添加到檔案結尾
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}
           

分析Footer類的源碼可知,Footer更詳細的結構如下:

metaindex_handle:   char[p];      // Block handle for metaindex
index_handle:       char[q];      // Block handle for index
padding:            char[40-p-q]; // 0 bytes to make fixed length
                                  // (40==2*BlockHandle::kMaxEncodedLength)
magic:              fixed64;      // == 0xdb4775248b80fb57
           

注意,隻在Finish的時候才調用WriteBlock給Index Block添加了type和crc,但是對于Data Block,每次寫入到磁盤都會調用一次WriteBlock。

Mate Index Block、Index Block和Footer應該是在WritableFile析構時被寫入到磁盤的,WritableFile析構時會調用其Flush函數。

關于WritableFile,詳見:LevelDB源碼分析之九:env

三.Table的解析

leveldb通過Table類來解析每一個.sst檔案,Table類的成員變量也隻有一個結構體Rep* rep_,Rep的結構為:

struct Table::Rep {
  ~Rep() {
    delete index_block;
  }

  Options options;
  Status status;
  RandomAccessFile* file;
  uint64_t cache_id;//block cache的ID,用于組建block cache結點的key

  BlockHandle metaindex_handle;  //用于存儲從footer中解析出的metaindex_handle
  Block* index_block;
};
           

1.Open函數

Open函數比較簡單,就是打開一個本地.sst檔案,然後從檔案尾部讀取footer,根據footer中的index_handle,調用ReadBock函數讀取index_block。接着對結構體Rep指派,并将其當做參數傳給Table的構造函數。

Status Table::Open(const Options& options,
                   RandomAccessFile* file,
                   uint64_t size,
                   Table** table) {
  *table = NULL;
  if (size < Footer::kEncodedLength) {
    return Status::InvalidArgument("file is too short to be an sstable");
  }

  char footer_space[Footer::kEncodedLength];
  Slice footer_input;
  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
                        &footer_input, footer_space);
  if (!s.ok()) return s;

  Footer footer;
  s = footer.DecodeFrom(&footer_input);
  if (!s.ok()) return s;

  // Read the index block
  Block* index_block = NULL;
  if (s.ok()) {
    s = ReadBlock(file, ReadOptions(), footer.index_handle(), &index_block);
  }

  if (s.ok()) {
    // We've successfully read the footer and the index block: we're
    // ready to serve requests.
    Rep* rep = new Table::Rep;
    rep->options = options;
    rep->file = file;
    rep->metaindex_handle = footer.metaindex_handle();
    rep->index_block = index_block;
    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
    *table = new Table(rep);
  } else {
    if (index_block) delete index_block;
  }

  return s;
}
           

2.ReadBlock函數

// 根據BlockHandle從file中讀取block_data,放在*block中
Status ReadBlock(RandomAccessFile* file,
                 const ReadOptions& options,
                 const BlockHandle& handle,
                 Block** block) {
  *block = NULL;

  // n是block_data的大小
  size_t n = static_cast<size_t>(handle.size());
  // n + kBlockTrailerSize就是block_data+type+crc的大小
  char* buf = new char[n + kBlockTrailerSize];
  Slice contents;
  // 根據Block的偏移讀取指定内容
  Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
  if (!s.ok()) {
    delete[] buf;
    return s;
  }
  if (contents.size() != n + kBlockTrailerSize) {
    delete[] buf;
    return Status::Corruption("truncated block read");
  }

  // 從contents解析出crc并校驗,可以通過options.verify_checksums配置不校驗
  const char* data = contents.data();    
  if (options.verify_checksums) {
    const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1));
    const uint32_t actual = crc32c::Value(data, n + 1);
    if (actual != crc) {
      delete[] buf;
      s = Status::Corruption("block checksum mismatch");
      return s;
    }
  }
  // data[n]實際上就是type字段
  switch (data[n]) {
    case kNoCompression:
	  //
      if (data != buf) {
        // File implementation gave us pointer to some other data.
        // Copy into buf[].
        memcpy(buf, data, n + kBlockTrailerSize);
      }

      // Ok
      break;
    // 我用的源碼比較老,不涉及Snappy壓縮
    case kSnappyCompression: {
      size_t ulength = 0;
      if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
        delete[] buf;
        return Status::Corruption("corrupted compressed block contents");
      }
      char* ubuf = new char[ulength];
      if (!port::Snappy_Uncompress(data, n, ubuf)) {
        delete[] buf;
        delete[] ubuf;
        return Status::Corruption("corrupted compressed block contents");
      }
      delete[] buf;
      buf = ubuf;
      n = ulength;
      break;
    }
    default:
      delete[] buf;
      return Status::Corruption("bad block type");
  }

  *block = new Block(buf, n);  // Block takes ownership of buf[]
  return Status::OK();
}
           

3.NewIterator函數

NewIterator用于建立Table的疊代器,此疊代器是一個雙層疊代器,詳見:LevelDB源碼分析之十四:TwoLevelIterator

Iterator* Table::NewIterator(const ReadOptions& options) const {
  return NewTwoLevelIterator(
      rep_->index_block->NewIterator(rep_->options.comparator),
      &Table::BlockReader, const_cast<Table*>(this), options);
}
           

傳入的參數包括Index Block中block_data字段的疊代器和函數BlockReader的指針,該函數用于建立Data Block中block_data字段的疊代器。

4.BlockReader

// arg:Table的指針
// index_value:Data Block中block_data字段的疊代器的value值,
//              也就是Data Block的偏移和該Data Block的block_data字段大小
//			    編碼後的結果             
// return:Data Block中block_data字段的疊代器
Iterator* Table::BlockReader(void* arg,
                             const ReadOptions& options,
                             const Slice& index_value) {
  Table* table = reinterpret_cast<Table*>(arg);
  Cache* block_cache = table->rep_->options.block_cache;
  Block* block = NULL;
  Cache::Handle* cache_handle = NULL;

  BlockHandle handle;
  Slice input = index_value;
  // 解碼得到Data Block的偏移和該Data Block的block_data字段大小
  Status s = handle.DecodeFrom(&input);

  if (s.ok()) {
	// 1.如果Block Cache不為NULL,先去Block Cache中查找結點,
	//   如果沒找到,再去檔案中讀取Data Block的block_data字段,
	//   并将該block_data插入到Block Cache
	// 2.如果Block Cache為NULL,直接去檔案裡讀
    if (block_cache != NULL) {
	  // 組建key
      char cache_key_buffer[16];
      EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
      EncodeFixed64(cache_key_buffer+8, handle.offset());
      Slice key(cache_key_buffer, sizeof(cache_key_buffer));
      cache_handle = block_cache->Lookup(key);
      if (cache_handle != NULL) {
        block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
      } else {
        s = ReadBlock(table->rep_->file, options, handle, &block);
        if (s.ok() && options.fill_cache) {
          cache_handle = block_cache->Insert(
              key, block, block->size(), &DeleteCachedBlock);
        }
      }
    } else {
      s = ReadBlock(table->rep_->file, options, handle, &block);
    }
  }

  Iterator* iter;
  // 如果block讀取成功
  if (block != NULL) {
    iter = block->NewIterator(table->rep_->options.comparator);
	// iter->RegisterCleanup函數實作會有點繞,被它注冊的函數會在iter析構時被調用
	// 如果block_cache為NULL,說明block不在緩存中,iter析構時調用DeleteBlock删除這個block。
	// 否則調用ReleaseBlock使block_cache的cache_handle結點減少一個引用計數
    if (cache_handle == NULL) {
      iter->RegisterCleanup(&DeleteBlock, block, NULL);
    } else {
      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
    }
  } else {
	// 否則傳回錯誤
    iter = NewErrorIterator(s);
  }
  return iter;
}
           

5.ApproximateOffsetOf

這個函數用于估算key值所在記錄的偏移,不準确。代碼中的注釋是我的個人了解,也不知對不對。隻有看到了調用該函數的代碼,才能更深入的了解。

// 估算key的偏移,不準确
uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
  Iterator* index_iter =
      rep_->index_block->NewIterator(rep_->options.comparator);
  index_iter->Seek(key);
  uint64_t result;
  if (index_iter->Valid()) {
    BlockHandle handle;
    Slice input = index_iter->value();
    Status s = handle.DecodeFrom(&input);
	// 
    if (s.ok()) {
	  // result比key的真實偏移小
      result = handle.offset();
    } else {
      // Strange: we can't decode the block handle in the index block.
      // We'll just return the offset of the metaindex block, which is
      // close to the whole file size for this case.
	  // result比key的真實偏移大
      result = rep_->metaindex_handle.offset();
    }
  } else {
    // key is past the last key in the file.  Approximate the offset
    // by returning the offset of the metaindex block (which is
    // right near the end of the file).
	// result比key的真實偏移小
    result = rep_->metaindex_handle.offset();
  }
  delete index_iter;
  return result;
}
           

參考連結:http://catkang.github.io/2017/01/17/leveldb-data.html