文章目錄
- 前言
- Compaction Filter
- compaction_filter 使用
- compaction_filter_factory 使用
- compaction filter實作
- Table Collector
- Table Collector 使用
- Table Collector 實作
- 總結
前言
Rocksdb 因為out of-place update 的方式,導緻很多背景運作的合并效率較低,尤其是使用者在delete-aware場景想要快速回收key版本,但因為背景操作,不是很友善。同時使用者想要删除/保留一批自定義方式的key,這個時候需要由使用者來控制compaction過程的一些行為。
是以Rocksdb針對這樣的場景對使用者暴露了一些接口,支援使用者自定義過濾指定類型的key,保留/删除(當然主要是删除),這也是compaciton_filter的接口産生源頭。
Table_collector 也是一種類似的接口,Rocksdb為使用者暴露了能夠通路到底層sst的key-value接口,進而支援一些針對相關key的一些定制化行為(比如統計某種使用者定義的key的數量,根據某種key數量的比重加速compaction等等)。
本文将簡單介紹一下這兩種使用者接口的用法以及其如何在rocksdb 底層源代碼中的實作原理。
相關的rocksdb代碼版本是6.19
Compaction Filter
compaction_filter 使用
- 自定義compaction filter,聲明如下:
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
public:
const char* Name() const override;
bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const override;
};
- 定義如下:
const char* RemoveEmptyValueCompactionFilter::Name() const {
return "RemoveEmptyValueCompactionFilter";
}
bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
const Slice& /*key*/,
const Slice& existing_value,
std::string* /*new_value*/,
bool* /*value_changed*/) const {
// remove kv pairs that have empty values
return existing_value.empty();
}
主體是Filter函數,這裡底層傳入的參數都能夠被使用者看到包括:
- Level 目前key的輸入level
- key user_key内容
- existing_value user_key的value
- new_value 如果要修改這個existing_value,則這個new_value将被傳出
- value_changed 确定要修改existing_value,需要将這個配置置為true,也會被傳出
- 增加compaction_filter的配置項
options.compaction_filter = new rocksdb::RemoveEmptyValueCompactionFilter();
-
User Filter 監控
通過增加
來檢視被compaction_filter過濾掉的key的數目rocksdb.compaction.key.drop.user
compaction_filter_factory 使用
上一節介紹的是compaction_filter的基本使用,使用者主體隻需要直接實作一個Filter函數。而通過compaction_filter_factory 實作自己的filter工廠,能夠使用者自定義何時建立自己的filter;有多個Filter是能夠通過compaction_filter_factory 來疊加使用。一般
compaction_filter
和
compaction_filter_factory
隻需要使用一個就可以了。
如下filter_factory的用例是rocksdb自己的ttl compaction功能,目的是建立一個filter,使用者隻需要傳入一個過期時間,由filter自行在compaction過程中針對過期的key進行清理(當然,前提是key的寫入也需要按照ttldb的方式寫入),同時factory也定義了保留其他的使用者filter。
- 定義compaction_filter_factory
class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl),
clock_(clock),
user_comp_filter_factory_(comp_filter_factory) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( // 必須要實作
const CompactionFilter::Context& context) override {
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr;
// 保留其他的使用者filter
if (user_comp_filter_factory_) {
user_comp_filter_from_factory =
user_comp_filter_factory_->CreateCompactionFilter(context);
}
// 建立一個ttl filter,并将其他已經配置的filter傳入
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
}
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
virtual const char* Name() const override { // 必須要實作的
return "TtlCompactionFilterFactory";
}
private:
int32_t ttl_;
SystemClock* clock_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
ps:為什麼Name()和CreateCompactionFilter必須要實作,可以看看
CompactionFilterFactory
,這兩個函數其實是純虛函數,必須要基類實作。
class CompactionFilterFactory {
public:
virtual ~CompactionFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
};
- 基于factory 定義自己的filter
// LayeredCompactionFilterBase 是用來實作filter疊加使用的
class TtlCompactionFilter : public LayeredCompactionFilterBase {
public:
TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
clock_(clock) {}
// ttl filter,用來判斷key是否過期,進而決定是否清理
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
override {
if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
return true;
}
if (user_comp_filter() == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
new_val->append(
old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
DBWithTTLImpl::kTSLength);
}
return false;
}
virtual const char* Name() const override { return "Delete By TTL"; }
private:
int32_t ttl_;
SystemClock* clock_;
};
- 增加compaction_filter_factory的配置項
options->compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
ttl, clock, options->compaction_filter_factory));
compaction filter實作
關于compaction 處理邏輯的源代碼分析可以參考rocksdb compaction實作原理。
Compaction filter以及 factory生效的邏輯都在 最終compaction處理實際的key-value資料的函數中
ProcessKeyValueCompaction
函數剛開始,會擷取使用者傳入的compaction_filter指針, 如果compaction_filter指針為空,使用者會嘗試構造compaction_filter_factory,這也就像是上面說的兩者之間按需選擇一個實作即可。
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
.....
// Create compaction filter and fail the compaction if
// IgnoreSnapshots() = false because it is not supported anymore
const CompactionFilter* compaction_filter =
cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
// 先擷取使用者傳入的compaction_filter_factory
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
// 再從factory擷取compaction_filter
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
sub_compact->status = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return;
}
......
}
關于這裡的
CreateCompactionFilter
的實作如下,其中有兩個配置非常有用:
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
if (!cfd_->ioptions()->compaction_filter_factory) {
return nullptr;
}
CompactionFilter::Context context;
context.is_full_compaction = is_full_compaction_;
context.is_manual_compaction = is_manual_compaction_;
context.column_family_id = cfd_->GetID();
return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
context);
}
也就是Rocksdb會将
is_full_compaction
和
is_manual_compaction
透傳回給使用者,使用者可以根據是否是fullcompaction和manual compaction來指定自己的filter/factory的行為。
回到
ProcessKeyValueCompaction
之中,後續會通過
iter->Next()
-->
NextFromInput()
逐個處理compaction擷取到sst中的資料,其中關于compaction_filter的處理會進入到
CompactionIterator::InvokeFilterIfNeeded
函數
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
...
if (CompactionFilter::Decision::kUndetermined == filter) {
// 取到filter的行為,決定是remove,還是keep,開始其他相關操作。
filter = compaction_filter_->FilterV2(
compaction_->level(), filter_key, value_type,
blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
}
......
}
其中
FilterV2
的行為如下,因為這個函數也會被用作
MergeOperator
中,是以會有
kMergeOperand
類似的判斷;針對正常的Put這種方式寫入的key都會調用Filter函數來處理,如果Filter傳回為true,則表示需要被remove的。
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
switch (value_type) {
case ValueType::kValue: {
bool value_changed = false;
// 使用者自定義的Filter函數
bool rv = Filter(level, key, existing_value, new_value, &value_changed);
if (rv) {
return Decision::kRemove;
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
case ValueType::kBlobIndex:
return Decision::kKeep;
}
assert(false);
return Decision::kKeep;
}
再次回到
InvokeFilterIfNeeded
函數中:
針對傳回值為
Decision::kRemove
的類型,會将目前的key标記為
kTypeDeletion
,由
NextFromInput
後續進行清理
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
}
到此,基本就清楚了compaction _filter如何在rocksdb内部進行初始化 以及完成指定類型key的清理,仍然是以打一個
kTypeDelete
的tombstone。
Table Collector
TablePropertiesCollector 的主要作用是在建構SST過程中由使用者來指定收集什麼樣的properties資訊。
也能夠針對目前sst檔案觸發一些行為,比如加速目前sst的compaction
Table Collector 使用
- Options 中的用法
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
使用者可以自定義多個collectors,隻需要将自己定義的collectors工廠添加到table_properties_collector_factories數組中即可。
-
繼承 table_properties_collector_factories 之後的自定義
如下CountingUserTblPropCollector 實作了一個統計key個數的功能,并編碼到了SSTables的properties屬性中。
class CountingUserTblPropCollector : public TablePropertiesCollector {
public:
const char* Name() const override { return "CountingUserTblPropCollector"; }
// 完成一個sstable的建構之後會調用 Finish函數,主要指定使用者在建構完成之後的一些自定義行為。
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties{
{"CountingUserTblPropCollector", message_}, {"Count", encoded},
};
return Status::OK();
}
// 使用者根據傳入的key指定自己的行為,這裡是自增計數;這個函數也是一個主要函數,這裡能夠看到userkey。
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
EntryType /*type*/, SequenceNumber /*seq*/,
uint64_t /*file_size*/) override {
++count_;
return Status::OK();
}
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
private:
std::string message_ = "Rocksdb";
uint32_t count_ = 0;
};
再建構一個Properties工廠,用來支援傳入到option之中
class CountingUserTblPropCollectorFactory
: public TablePropertiesCollectorFactory {
public:
explicit CountingUserTblPropCollectorFactory(
uint32_t expected_column_family_id)
: expected_column_family_id_(expected_column_family_id),
num_created_(0) {}
// 必須實作
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override {
EXPECT_EQ(expected_column_family_id_, context.column_family_id);
num_created_++;
return new CountingUserTblPropCollector();
}
// 必須實作
const char* Name() const override {
return "CountingUserTblPropCollectorFactory";
}
void set_expected_column_family_id(uint32_t v) {
expected_column_family_id_ = v;
}
uint32_t expected_column_family_id_;
uint32_t num_created_;
};
更多的tablePropertisCollector的定義可以參考
table_properties.h
,更多的
properties_collector
相關的案例可以整個rocksdb項目 中搜尋
public TablePropertiesCollector
,其中Rocksdb自己實作了針對delete 比例比較重的場景 加速這種sst 的comapction功能,可以參考Rocksdb 對Delete問題的優化
Table Collector 實作
引擎内部接受使用者傳入的TablePropertiesCollector 的過程如下調用棧:
CompactionJob::ProcessKeyValueCompaction // compaction的實際執行函數 -- 處理key-value
CompactionJob::OpenCompactionOutputFile // 建立compaction過程中的一些檔案/檔案buffer
TableBuilder* NewTableBuilder // 建立一種存儲key-value資料類型的table
BlockBasedTableFactory::NewTableBuilder // 預設建立blockbasedtable
BlockBasedTableBuilder::BlockBasedTableBuilder // 構造函數
Rep // 構造rep,代表blockbasedtable 來添加資料
在構造
Rep
的過程中,會将使用者傳入的collector_factories 添加到rep維護的properties_colectors成員之中
for (auto& collector_factories : *int_tbl_prop_collector_factories) {
table_properties_collectors.emplace_back(
collector_factories->CreateIntTblPropCollector(column_family_id));
}
現在引擎的blockbasedtable的持有者已經拿到了使用者自定義的collectors,之後要構造sstable的過程來執行
collectors
的一些行為,則會在如下調用棧中進行:
CompactionJob::ProcessKeyValueCompaction
CompactionJob::FinishCompactionOutputFile
BlockBasedTableBuilder::Add
NotifyCollectTableCollectorsOnAdd
NotifyCollectTableCollectorsOnAdd
這個函數中會拿着前面構造好的rep的
table_properties_collectors
成員
bool NotifyCollectTableCollectorsOnAdd(
const Slice& key, const Slice& value, uint64_t file_size,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log) {
bool all_succeeded = true;
// 周遊所有的collectors,每個都執行一次InternalAdd
for (auto& collector : collectors) {
Status s = collector->InternalAdd(key, value, file_size);
all_succeeded = all_succeeded && s.ok();
if (!s.ok()) {
LogPropertiesCollectionError(info_log, "Add" /* method */,
collector->Name());
}
}
return all_succeeded;
}
如果collector是使用者創造的類型,則會統一進入
UserKeyTablePropertiesCollector
邏輯中,進而調用我們自己實作的
AddUserKey
函數。
Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
const Slice& value,
uint64_t file_size) {
ParsedInternalKey ikey;
Status s = ParseInternalKey(key, &ikey, false /* log_err_key */); // TODO
if (!s.ok()) {
return s;
}
return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type),
ikey.sequence, file_size);
}
這樣就能夠在建立sstable的過程中完成我們自定義屬性的更新。
如果在
CompactionJob::FinishCompactionOutputFile
中完成了key的添加,并構造完成了sstable,則會嘗試擷取
NeedCompact
标記,這個标記可以在我們自定義的collector中完成設定。
if (s.ok()) {
meta->fd.file_size = current_bytes;
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
|
| 調用BlockBasedTableBuilder的NeedCompact
\ /
bool BlockBasedTableBuilder::NeedCompact() const {
// 從使用者自定義行為中檢視NeedCompact标記
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
return true;
}
}
return false;
}
到此,我們就知道了TablePropertiesCollector的行為在引擎中的實作。
總結
1. Rocksdb ThreadLocal 機制 原理?為什麼要有這個機制?其在讀場景下有什麼優勢?
2. Rocksdb syncpoint 實作,基本用法
3. Rocksdb 事務瑣實作:鎖管理/加鎖流程/死鎖檢測/空間消耗。
4. Rocksdb 樂觀事務/悲觀事務實作,基本用法,适用場景。
5. Rocksdb 壓縮實作。
6. Rocksdb Pinnable 結合Cleanable 在iter中的作用 及 實作。