LSM Tree 与 LevelDB

Sgjak.png

我们在 Rust Project 2: Log-structured file I/O 解读 中已经实践过一个 log-structed map,我们今天看看真实世界的 log-structured merge-tree,这是一个优化 操作的数据结构。

LSM Tree

在原始的 The Log-Structured Merge-Tree (LSM Tree) 的设计中,我们在内存中维护一个 C0 树,我们不断的向这个树中 Append 数据,当内存中的 C0 超过一定大小,我们将部分数据落入磁盘的 C1 树,分级之后还有 C2 树 C3 树 CK 树,每一颗树都比前一颗树更大。不过在 LevelDB 中的设计有所形变。

lsm tree

SSTables

LevelDBLSM Tree 的数据结构中,我们映射到持久化的磁盘对象时候用的对象叫 SSTable [Sorted Strings Table]SSTABLE 中由多个 Segement 构成,这和我们在 LS file system 中一样,减少小碎片文件的写入。

S4nBu.png

不过值得注意的,在 Segment 中的 KV 对象都是排序好的。

写入数据

所有的数据在一开始,我们都先通过 WAL LOG 落盘,方便断电恢复。

和大部分的数据库一样,我们还是要在内存中维护一个 索引 关系,这里我们一般使用 红黑树

写操作被存储在这个红黑树中,直到树达到预定义的大小。一旦红黑树有了足够的条目,它将作为磁盘上排序后的一个段刷新到磁盘上。

S4dmX.png

在内存中的阶段一般也称之为 memtable,因为此时还没写入磁盘,我们依然需要提供可检索的能力。

Write In LevelDB

leveldb 的实现定义中,写入的操作如下定义

db_impl.cc
1
2
3
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
  • WriteOptions 控制着我们是否需要 sync,也就是刷到磁盘上
Write Log

真实的写操作

db_impl.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
// 排队写入,直到我们在 front
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 如果已经完成,就直接返回,这里注意就是因为上面的Wait,有可能在队列里面已经被其他人消费了。
if (w.done) {
return w.status;
}

// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence(); // 拿到最新的序列号
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); // 这里会把 writers 队列中的其他适合的写操作一起执行
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); // 将序号 +1
last_sequence += WriteBatchInternal::Count(write_batch); // 因为这里有聚合操作,所以序号 + N

{
mutex_.Unlock(); // 把锁释放了,因为我们已经从 writers 中拿到了我们想要的数据,而且 SEQ 也已经定下来的了
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); // 写入 WAL Log
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_); // 这里写入,mem_ 是 MemTable,也就是在内存中的那些数据
}
mutex_.Lock();
if (sync_error) {
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

// .... 唤醒机制
return status;
}

如果希望通过 Debug 来阅读代码,可以在 db_test.cc 中设定断点。

从上面的代码我们可以看到如何处理 WAL log 的逻辑,下一步就是如何写入我们的 memtable 中了。

write_batch.cc
1
2
3
4
5
6
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter); //创建一个 Inserter,然后将其插入
}
Write MemTable

跳过中间那些 foreach 执行的部分,我们看看真正的 PUT 逻辑,在 MemTableInserter::Put

MemTableInserter::Put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}

// MemTable Add 的逻辑很简单,开辟空间,扔进去
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
// 开辟一个 buf 空间给这个 kv
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
// copy 进去
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
// 将索引写入 Table 中 [其实是SkipList]
table_.Insert(buf);
}

对于 SkipList 就不做多解释了。

详细参考

Write Disk

而对于 MemTable 满了如何写入磁盘,这里的操作在我们略过的 MakeRoomForWrite

DBImpl::MakeRoomForWrite
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// ... 中间跳过了很多 case
} else {
// Attempt to switch to a new memtable and trigger compaction of old
// 尝试创建一个新的 memtable,并且将 旧数据进行压实
assert(versions_->PrevLogNumber() == 0);

// 创建了一个新的日志文件
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);

// 创建一个新的 MemTable
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false;

// 找寻恰当的时候进行压实
MaybeScheduleCompaction();
}
}
return s;
}
CompactMemTable
DBImpl::CompactMemTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);

// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base); // 内存的数据往 level0 写,最近的数据嘛
base->Unref();

if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}

// ....
}

删除值

和之前处理逻辑一样,我们增加一个特殊的值,appendlog 中,标记此值已死亡。

读取数据

为了方便在 SSTable 中查询数据。Index Table 被放置于每一个 SSTable 的尾端,一般是 sparse index [稀疏索引],我们载入这个块的时候,讲 Index Table 载入内存,提供查询的能力。

S4e9D.png

当我们查找某个 Key,比如 Dollar 这个值势必在 dogdowngrade 之间,因此我们只需要扫描 1720819504 但是如果数据没有的情况下,我们可能会扫描全表,因此我们加入 bloom filter 来帮助我们快速确认是否某个值根本不存在。

精准读

考虑下这样数据结构下的读操作

如果我们想要读取 23 的值得话,按照图中标示的顺序,会先读取内存,在从Level0依次往高层读取,直到找到key=23的数据。

这大概也是 LevelDB 的由来吧


不过显然每次去磁盘读取并不会快。因此至少有一些部分可以优化

  • 将所有的 SSTableIndex 保存在内存中
  • 尽可能的多缓存我们的 SSTable 在内存中

Read In LevelDB

DBImpl::Get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;

// 先限制下最新的数据到哪里
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
// 这里我们先去 mem 中获取也就是内存中的
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
// 然后尝试从 imm,也就是即将持久化的内存对象中找
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {

// 最后才从其他地方找
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

我们在 memimm 中没办法找到之后采去 level 文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Status Version::Get(const ReadOptions& options, const LookupKey& k,
std::string* value, GetStats* stats) {
stats->seek_file = nullptr;
stats->seek_file_level = -1;

struct State {
static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);

state->last_file_read = f;
state->last_file_read_level = level;

// 从内存缓存的 `table cache`中查找,不过这里我们只缓存了 `key`,`value` 在里面还需要取一下
// PS: Table Cache 是基于 LRU 实现的
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
if (!state->s.ok()) {
state->found = true;
return false;
}
}
};
ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);
return state.found ? state.s : Status::NotFound(Slice());
}

Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
// 找到对应的 Table
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
// 找到了 Table,我们下面就去取这个 Value 在 InternalGet,这里不展开了。
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}

但是我们没有找到,那种按照层级查找的代码啊?其实我们漏了一个很重要的函数也就是 ForEachOverlapping

Version::ForEachOverlapping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();

// Search level-0 in order from newest to oldest.
// 从 Level-0 开始的逆序查找,因为大部分时候,Level-0 能找到,局部性原则嘛
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}

// Search other levels. 查找其他的层级
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// 二分查找 INdex
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}

压缩

很多值会变的重复起来,和我们之前遇见的情况一样。

S46Qp.png

我们进行压缩并且删除老数据。

流程

  • 写入的时候,我们在内存中使用 RD Tree / SkipList 在必要的时候触发持久化
  • 持久化的时候,按照排序的方式进行写入
  • 读取数据的时候,先通过 bloom 过滤,然后再遍历 offset,把所有的 Key 放在内存里也可以
  • 读取 offset 范围内的对象,然后找到再返回

Read Write Space 也有类似于 CAP 的定理


讲了一大串和 时序数据库 有啥关系呢?时序数据库是一个大规模写的系统,查询也有,不过倒是容忍多高,因此大多数的 TSDB [Time Series Database] 都是使用 LSM 来进行储存的。解决完单机的储存,对于分布式系统,靠一台机器肯定是不行的,因此我们就要想到如何把数据分散到不同的机器上。

分布式存储

分布式存储首先要考虑的是如何将数据分布到多台机器上面,也就是 分片(sharding)问题

  • 哈希分片:这种方法实现简单,均衡性较好,但是集群不易扩展。[因为增加机器就涉及到Hash值改变]
  • 一致性哈希:这种方案均衡性好,集群扩展容易,迁移的数据会少一些
  • 范围划分:通常配合全局有序,复杂度在于合并和分裂。代表有Hbase。
  • 结合时序数据库的特点,根据 metric + tags 分片。

参考