天天看點

Rocksdb 的compaction_filter和table_properties_collector 用法 及 其底層實作

文章目錄

  • ​​前言​​
  • ​​Compaction Filter​​
  • ​​compaction_filter 使用​​
  • ​​compaction_filter_factory 使用​​
  • ​​compaction filter實作​​
  • ​​Table Collector​​
  • ​​Table Collector 使用​​
  • ​​Table Collector 實作​​
  • ​​總結​​

前言

Rocksdb 因為out of-place update 的方式,導緻很多背景運作的合并效率較低,尤其是使用者在delete-aware場景想要快速回收key版本,但因為背景操作,不是很友善。同時使用者想要删除/保留一批自定義方式的key,這個時候需要由使用者來控制compaction過程的一些行為。

是以Rocksdb針對這樣的場景對使用者暴露了一些接口,支援使用者自定義過濾指定類型的key,保留/删除(當然主要是删除),這也是compaciton_filter的接口産生源頭。

Table_collector 也是一種類似的接口,Rocksdb為使用者暴露了能夠通路到底層sst的key-value接口,進而支援一些針對相關key的一些定制化行為(比如統計某種使用者定義的key的數量,根據某種key數量的比重加速compaction等等)。

本文将簡單介紹一下這兩種使用者接口的用法以及其如何在rocksdb 底層源代碼中的實作原理。

相關的rocksdb代碼版本是6.19

Compaction Filter

compaction_filter 使用

  1. 自定義compaction filter,聲明如下:
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
 public:
    const char* Name() const override;
    bool Filter(int level,
        const Slice& key,
        const Slice& existing_value,
        std::string* new_value,
        bool* value_changed) const override;
};      
  1. 定義如下:
const char* RemoveEmptyValueCompactionFilter::Name() const {
  return "RemoveEmptyValueCompactionFilter";
}

bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
                                              const Slice& /*key*/,
                                              const Slice& existing_value,
                                              std::string* /*new_value*/,
                                              bool* /*value_changed*/) const {
  // remove kv pairs that have empty values
  return existing_value.empty();
}      

主體是Filter函數,這裡底層傳入的參數都能夠被使用者看到包括:

  • Level 目前key的輸入level
  • key user_key内容
  • existing_value user_key的value
  • new_value 如果要修改這個existing_value,則這個new_value将被傳出
  • value_changed 确定要修改existing_value,需要将這個配置置為true,也會被傳出
  1. 增加compaction_filter的配置項
options.compaction_filter = new rocksdb::RemoveEmptyValueCompactionFilter();      
  1. User Filter 監控

    通過增加​

    ​rocksdb.compaction.key.drop.user​

    ​ 來檢視被compaction_filter過濾掉的key的數目

compaction_filter_factory 使用

上一節介紹的是compaction_filter的基本使用,使用者主體隻需要直接實作一個Filter函數。而通過compaction_filter_factory 實作自己的filter工廠,能夠使用者自定義何時建立自己的filter;有多個Filter是能夠通過compaction_filter_factory 來疊加使用。一般​

​compaction_filter​

​​和​

​compaction_filter_factory​

​隻需要使用一個就可以了。

如下filter_factory的用例是rocksdb自己的ttl compaction功能,目的是建立一個filter,使用者隻需要傳入一個過期時間,由filter自行在compaction過程中針對過期的key進行清理(當然,前提是key的寫入也需要按照ttldb的方式寫入),同時factory也定義了保留其他的使用者filter。

  1. 定義compaction_filter_factory
class TtlCompactionFilterFactory : public CompactionFilterFactory {
 public:
  TtlCompactionFilterFactory(
      int32_t ttl, SystemClock* clock,
      std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
      : ttl_(ttl),
        clock_(clock),
        user_comp_filter_factory_(comp_filter_factory) {}

  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( // 必須要實作
      const CompactionFilter::Context& context) override {
    std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
        nullptr;
    // 保留其他的使用者filter
    if (user_comp_filter_factory_) {
      user_comp_filter_from_factory =
          user_comp_filter_factory_->CreateCompactionFilter(context);
    }

    // 建立一個ttl filter,并将其他已經配置的filter傳入
    return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
        ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
  }

  void SetTtl(int32_t ttl) {
    ttl_ = ttl;
  }

  virtual const char* Name() const override { // 必須要實作的
    return "TtlCompactionFilterFactory";
  }

 private:
  int32_t ttl_;
  SystemClock* clock_;
  std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};      
ps:為什麼Name()和CreateCompactionFilter必須要實作,可以看看​

​CompactionFilterFactory​

​,這兩個函數其實是純虛函數,必須要基類實作。

class CompactionFilterFactory {

public:

virtual ~CompactionFilterFactory() {}

virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(

const CompactionFilter::Context& context) = 0;

// Returns a name that identifies this compaction filter factory.

virtual const char* Name() const = 0;

};

  1. 基于factory 定義自己的filter
// LayeredCompactionFilterBase 是用來實作filter疊加使用的
class TtlCompactionFilter : public LayeredCompactionFilterBase {
 public:
  TtlCompactionFilter(int32_t ttl, SystemClock* clock,
                      const CompactionFilter* _user_comp_filter,
                      std::unique_ptr<const CompactionFilter>
                          _user_comp_filter_from_factory = nullptr)
      : LayeredCompactionFilterBase(_user_comp_filter,
                                    std::move(_user_comp_filter_from_factory)),
        ttl_(ttl),
        clock_(clock) {}

  // ttl filter,用來判斷key是否過期,進而決定是否清理
  virtual bool Filter(int level, const Slice& key, const Slice& old_val,
                      std::string* new_val, bool* value_changed) const
      override {
    if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
      return true;
    }
    if (user_comp_filter() == nullptr) {
      return false;
    }
    assert(old_val.size() >= DBWithTTLImpl::kTSLength);
    Slice old_val_without_ts(old_val.data(),
                             old_val.size() - DBWithTTLImpl::kTSLength);
    if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
                                   value_changed)) {
      return true;
    }
    if (*value_changed) {
      new_val->append(
          old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
          DBWithTTLImpl::kTSLength);
    }
    return false;
  }

  virtual const char* Name() const override { return "Delete By TTL"; }

 private:
  int32_t ttl_;
  SystemClock* clock_;
};      
  1. 增加compaction_filter_factory的配置項
options->compaction_filter_factory =
        std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
            ttl, clock, options->compaction_filter_factory));      

compaction filter實作

關于compaction 處理邏輯的源代碼分析可以參考​​rocksdb compaction實作原理​​。

Compaction filter以及 factory生效的邏輯都在 最終compaction處理實際的key-value資料的函數中​

​ProcessKeyValueCompaction​

函數剛開始,會擷取使用者傳入的compaction_filter指針, 如果compaction_filter指針為空,使用者會嘗試構造compaction_filter_factory,這也就像是上面說的兩者之間按需選擇一個實作即可。

void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
  .....

  // Create compaction filter and fail the compaction if
  // IgnoreSnapshots() = false because it is not supported anymore
  const CompactionFilter* compaction_filter =
      cfd->ioptions()->compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (compaction_filter == nullptr) {
    // 先擷取使用者傳入的compaction_filter_factory
    compaction_filter_from_factory =
        sub_compact->compaction->CreateCompactionFilter();
    // 再從factory擷取compaction_filter
    compaction_filter = compaction_filter_from_factory.get();
  }
  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
    sub_compact->status = Status::NotSupported(
        "CompactionFilter::IgnoreSnapshots() = false is not supported "
        "anymore.");
    return;
  }
  ......  
}      

關于這裡的​

​CreateCompactionFilter​

​的實作如下,其中有兩個配置非常有用:

std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
  if (!cfd_->ioptions()->compaction_filter_factory) {
    return nullptr;
  }

  CompactionFilter::Context context;
  context.is_full_compaction = is_full_compaction_;
  context.is_manual_compaction = is_manual_compaction_;
  context.column_family_id = cfd_->GetID();
  return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
      context);
}      

也就是Rocksdb會将​

​is_full_compaction​

​​和​

​is_manual_compaction​

​ 透傳回給使用者,使用者可以根據是否是fullcompaction和manual compaction來指定自己的filter/factory的行為。

回到​

​ProcessKeyValueCompaction​

​​之中,後續會通過​

​iter->Next()​

​​ --> ​

​NextFromInput()​

​​逐個處理compaction擷取到sst中的資料,其中關于compaction_filter的處理會進入到​

​CompactionIterator::InvokeFilterIfNeeded​

​函數

bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
                                              Slice* skip_until) {
  ...
  if (CompactionFilter::Decision::kUndetermined == filter) {
      // 取到filter的行為,決定是remove,還是keep,開始其他相關操作。
      filter = compaction_filter_->FilterV2(
          compaction_->level(), filter_key, value_type,
          blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
          compaction_filter_skip_until_.rep());
    }
    ......
}      

其中​

​FilterV2​

​​的行為如下,因為這個函數也會被用作​

​MergeOperator​

​​中,是以會有​

​kMergeOperand​

​類似的判斷;針對正常的Put這種方式寫入的key都會調用Filter函數來處理,如果Filter傳回為true,則表示需要被remove的。

virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
                          const Slice& existing_value, std::string* new_value,
                          std::string* /*skip_until*/) const {
  switch (value_type) {
    case ValueType::kValue: {
      bool value_changed = false;
      // 使用者自定義的Filter函數
      bool rv = Filter(level, key, existing_value, new_value, &value_changed);
      if (rv) {
        return Decision::kRemove;
      }
      return value_changed ? Decision::kChangeValue : Decision::kKeep;
    }
    case ValueType::kMergeOperand: {
      bool rv = FilterMergeOperand(level, key, existing_value);
      return rv ? Decision::kRemove : Decision::kKeep;
    }
    case ValueType::kBlobIndex:
      return Decision::kKeep;
  }
  assert(false);
  return Decision::kKeep;
}      

再次回到​

​InvokeFilterIfNeeded​

​函數中:

針對傳回值為​

​Decision::kRemove​

​​的類型,會将目前的key标記為​

​kTypeDeletion​

​​,由​

​NextFromInput​

​後續進行清理

if (filter == CompactionFilter::Decision::kRemove) {
  // convert the current key to a delete; key_ is pointing into
  // current_key_ at this point, so updating current_key_ updates key()
  ikey_.type = kTypeDeletion;
  current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
  // no value associated with delete
  value_.clear();
  iter_stats_.num_record_drop_user++;
}      

到此,基本就清楚了compaction _filter如何在rocksdb内部進行初始化 以及完成指定類型key的清理,仍然是以打一個​

​kTypeDelete​

​的tombstone。

Table Collector

TablePropertiesCollector 的主要作用是在建構SST過程中由使用者來指定收集什麼樣的properties資訊。

也能夠針對目前sst檔案觸發一些行為,比如加速目前sst的compaction

Table Collector 使用

  1. Options 中的用法
opts.table_properties_collector_factories.emplace_back(
  NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));      

使用者可以自定義多個collectors,隻需要将自己定義的collectors工廠添加到table_properties_collector_factories數組中即可。

  1. 繼承 table_properties_collector_factories 之後的自定義

    如下CountingUserTblPropCollector 實作了一個統計key個數的功能,并編碼到了SSTables的properties屬性中。

class CountingUserTblPropCollector : public TablePropertiesCollector {
 public:
  const char* Name() const override { return "CountingUserTblPropCollector"; }

  // 完成一個sstable的建構之後會調用 Finish函數,主要指定使用者在建構完成之後的一些自定義行為。
  Status Finish(UserCollectedProperties* properties) override {
    std::string encoded;
    PutVarint32(&encoded, count_);
    *properties = UserCollectedProperties{
        {"CountingUserTblPropCollector", message_}, {"Count", encoded},
    };
    return Status::OK();
  }

  // 使用者根據傳入的key指定自己的行為,這裡是自增計數;這個函數也是一個主要函數,這裡能夠看到userkey。
  Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
                    EntryType /*type*/, SequenceNumber /*seq*/,
                    uint64_t /*file_size*/) override {
    ++count_;
    return Status::OK();
  }

  UserCollectedProperties GetReadableProperties() const override {
    return UserCollectedProperties{};
  }

 private:
  std::string message_ = "Rocksdb";
  uint32_t count_ = 0;
};      

再建構一個Properties工廠,用來支援傳入到option之中

class CountingUserTblPropCollectorFactory
    : public TablePropertiesCollectorFactory {
 public:
  explicit CountingUserTblPropCollectorFactory(
      uint32_t expected_column_family_id)
      : expected_column_family_id_(expected_column_family_id),
        num_created_(0) {}
  // 必須實作
  TablePropertiesCollector* CreateTablePropertiesCollector(
      TablePropertiesCollectorFactory::Context context) override {
    EXPECT_EQ(expected_column_family_id_, context.column_family_id);
    num_created_++;
    return new CountingUserTblPropCollector();
  }
  // 必須實作
  const char* Name() const override {
    return "CountingUserTblPropCollectorFactory";
  }
  void set_expected_column_family_id(uint32_t v) {
    expected_column_family_id_ = v;
  }
  uint32_t expected_column_family_id_;
  uint32_t num_created_;
};      

更多的tablePropertisCollector的定義可以參考​

​table_properties.h​

​​,更多的​

​properties_collector​

​​相關的案例可以整個rocksdb項目 中搜尋​

​public TablePropertiesCollector​

​​,其中Rocksdb自己實作了針對delete 比例比較重的場景 加速這種sst 的comapction功能,可以參考​​Rocksdb 對Delete問題的優化​​

Table Collector 實作

引擎内部接受使用者傳入的TablePropertiesCollector 的過程如下調用棧:

CompactionJob::ProcessKeyValueCompaction // compaction的實際執行函數 -- 處理key-value
  CompactionJob::OpenCompactionOutputFile // 建立compaction過程中的一些檔案/檔案buffer
    TableBuilder* NewTableBuilder // 建立一種存儲key-value資料類型的table
      BlockBasedTableFactory::NewTableBuilder // 預設建立blockbasedtable
        BlockBasedTableBuilder::BlockBasedTableBuilder // 構造函數
          Rep // 構造rep,代表blockbasedtable 來添加資料      

在構造​

​Rep​

​的過程中,會将使用者傳入的collector_factories 添加到rep維護的properties_colectors成員之中

for (auto& collector_factories : *int_tbl_prop_collector_factories) {
  table_properties_collectors.emplace_back(
    collector_factories->CreateIntTblPropCollector(column_family_id));
}      

現在引擎的blockbasedtable的持有者已經拿到了使用者自定義的collectors,之後要構造sstable的過程來執行​

​collectors​

​的一些行為,則會在如下調用棧中進行:

CompactionJob::ProcessKeyValueCompaction
  CompactionJob::FinishCompactionOutputFile
    BlockBasedTableBuilder::Add
      NotifyCollectTableCollectorsOnAdd      

​NotifyCollectTableCollectorsOnAdd​

​​ 這個函數中會拿着前面構造好的rep的​

​table_properties_collectors​

​成員

bool NotifyCollectTableCollectorsOnAdd(
    const Slice& key, const Slice& value, uint64_t file_size,
    const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
    Logger* info_log) {
  bool all_succeeded = true;
  // 周遊所有的collectors,每個都執行一次InternalAdd
  for (auto& collector : collectors) {
    Status s = collector->InternalAdd(key, value, file_size);
    all_succeeded = all_succeeded && s.ok();
    if (!s.ok()) {
      LogPropertiesCollectionError(info_log, "Add" /* method */,
                                   collector->Name());
    }
  }
  return all_succeeded;
}      

如果collector是使用者創造的類型,則會統一進入​

​UserKeyTablePropertiesCollector​

​​邏輯中,進而調用我們自己實作的​

​AddUserKey​

​函數。

Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
                                                    const Slice& value,
                                                    uint64_t file_size) {
  ParsedInternalKey ikey;
  Status s = ParseInternalKey(key, &ikey, false /* log_err_key */);  // TODO
  if (!s.ok()) {
    return s;
  }

  return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type),
                                ikey.sequence, file_size);
}      

這樣就能夠在建立sstable的過程中完成我們自定義屬性的更新。

如果在​

​CompactionJob::FinishCompactionOutputFile​

​​ 中完成了key的添加,并構造完成了sstable,則會嘗試擷取​

​NeedCompact​

​标記,這個标記可以在我們自定義的collector中完成設定。

if (s.ok()) {
  meta->fd.file_size = current_bytes;
  meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}

 |
 | 調用BlockBasedTableBuilder的NeedCompact
\ /
   
bool BlockBasedTableBuilder::NeedCompact() const {
  // 從使用者自定義行為中檢視NeedCompact标記
  for (const auto& collector : rep_->table_properties_collectors) {
    if (collector->NeedCompact()) {
      return true;
    }
  }
  return false;
}      

到此,我們就知道了TablePropertiesCollector的行為在引擎中的實作。

總結

1. Rocksdb ThreadLocal 機制 原理?為什麼要有這個機制?其在讀場景下有什麼優勢?
2. Rocksdb syncpoint 實作,基本用法
3. Rocksdb 事務瑣實作:鎖管理/加鎖流程/死鎖檢測/空間消耗。
4. Rocksdb 樂觀事務/悲觀事務實作,基本用法,适用場景。
5. Rocksdb 壓縮實作。
6. Rocksdb Pinnable 結合Cleanable 在iter中的作用 及 實作。