LevelDB 自学指南

LevelDB是一款写性能十分优秀的可持久化的KV存储引擎,其实现原理是依据LSM-Tree(Log Structed-Merge Tree),由Google开源,今天聊聊这个。

准备环境

首先我们先下载代码,编译试试。

1
2
git clone --recurse-submodules https://github.com/google/leveldb.git
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake --build .

然后我们增加我们自己的测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// example.simple.cc

#include <iostream>
#include <sstream>
#include <string>

#include "leveldb/db.h"

using namespace std;

int main(int argc, char** argv) {
// Set up database connection information and open database
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;

leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);

if (false == status.ok()) {
cerr << "Unable to open/create test database './testdb'" << endl;
cerr << status.ToString() << endl;
return -1;
}

// Add 256 values to the database
leveldb::WriteOptions writeOptions;
for (unsigned int i = 0; i < 256; ++i) {
ostringstream keyStream;
keyStream << "Key" << i;

ostringstream valueStream;
valueStream << "Test data value: " << i;

db->Put(writeOptions, keyStream.str(), valueStream.str());
}

// Iterate over each item in the database and print them
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());

for (it->SeekToFirst(); it->Valid(); it->Next()) {
cout << it->key().ToString() << " : " << it->value().ToString() << endl;
}

if (false == it->status().ok()) {
cerr << "An error was found during the scan" << endl;
cerr << it->status().ToString() << endl;
}

delete it;

// Close the database
delete db;
}

然后在我们的 Makefile 中增加

1
2
  leveldb_test("db/c_test.c")
++leveldb_test("example/simple.cc")

然后就可以执行我们的测试逻辑了。

基础知识

对于 LevelDB 的基础知识就不做展开了,鉴于有很多全面的资料,非常推荐阅读 <LevelDB源码剖析>^1

LevelDB的存储引擎主要分为三个部件:

  • SSTable,就是Sorted String Table,是一个持久化的、有序的 SortedMap,存储在磁盘上;
  • WAL,Write Ahead Log,数据库里面经常用的技术,要写数据时,不直接写数据文件,而是先写一条日志,这样可以把对磁盘的随机写转换成顺序写
  • MemTable,保存了最近写入的键值对,数据写入 WAL 后,会同时写入 MemTable,这样便于查询。

SSTable是数据最终落盘的地方,而 WAL 保存了最近写入的数据,持久化到磁盘上,MemTable 则是 WAL 里数据的内存表示,因为日志的格式不便于查询,在内存中才便于快速查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
.
├── 000018.ldb <----- SSTable
├── 000020.ldb <----- SSTable
├── 000023.ldb <----- SSTable
├── 000026.ldb <----- SSTable
├── 000028.ldb <----- SSTable
├── 000031.log <----- WAL log
├── 000032.ldb <----- SSTable
├── 000033.ldb
├── CURRENT <----- 当前版本
├── LOCK
├── LOG <----- 运行日志
├── LOG.old
└── MANIFEST-000030

SST

SSTable 最早来自于 Google 的 Bigtable 论文

An SSTable provides a persistent, ordered immutable map from keys to values, where both keys and values are arbitrary byte strings. Operations are provided to look up the value associated with a specified key, and to iterate over all key/value pairs in a specified key range. Internally, each SSTable contains a sequence of blocks (typically each block is 64KB in size, but this is configurable). A block index (stored at the end of the SSTable) is used to locate blocks; the index is loaded into memory when the SSTable is opened. A lookup can be performed with a single disk seek: we first find the appropriate block by performing a binary search in the in-memory index, and then reading the appropriate block from disk. Optionally, an SSTable can be completely mapped into memory, which allows us to perform lookups and scans without touching disk.

Leveldb 中的具体实现参考 leveldb File format

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[filter block 1] <--- 仅仅 FilterPolicy 开启的时候
...
[filter block K]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>
  • Data Block存储数据,按照键的顺序进行排序;
  • Data Block后是Filter Block,也设计成为多个,存储了布隆过滤器的二进制数据;
  • Meta Index Block存储了指向Filter Block的指针,根据这个指针可以找到某个Filter Block开始的位置
  • Index Block存储了指向每一个Data Block的指针的数组;
  • 最后有一个大小固定的Footer,保存两个BlockHandler,分别指向Meta Index BlockIndex Block

Footer 一共是 48 字节

EncodeTogithub
1
2
3
4
5
6
7
8
9
10
void Footer::EncodeTo(std::string* dst) const {
const size_t original_size = dst->size();
metaindex_handle_.EncodeTo(dst);
index_handle_.EncodeTo(dst);
dst->resize(2 * BlockHandle::kMaxEncodedLength); // Padding
PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber & 0xffffffffu));
PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber >> 32));
assert(dst->size() == original_size + kEncodedLength);
(void)original_size; // Disable unused variable warning.
}

换做定义的话就是

1
2
3
4
5
6
struct Footer {
BlockHandler meta_index_block; // 定位Meta Index Block
BlockHandler index_block; // 定位Index Block
byte[n] padding; // 补齐到48字节
int64 magic; // 魔数 echo http://code.google.com/p/leveldb/ | sha1sum
}

Block

其他的区域都是 Block 类型的数据, 换做定义的是 (并没有实际的定义)

1
2
3
4
5
struct Block {
byte[] data;
byte compress_type;
int32 crc; <---- 可选的
}

主要参考 ReadBlock 推导出来。

Data Block

DataBlock 用了一种压缩的手段来储存数据,我们知道 Data 中其实是 KV 的结构,这里用了 Key 压缩的手段。

用结构定义的话,是这样的。

1
2
3
4
5
6
7
struct Kv {
varint32 shared_key_length; // 和前一个键相同的前缀长度
varint32 non_shared_key_length; // 不相同的键长度
varint32 value_length; // 值的长度
byte[] non_shared_key_content; // 不相同的键的内容
byte[] value; // 值的内容
}

通过这种方式将多个Kv连续存放在Data Block里,可以进行键的前缀压缩。然而这样会有一个问题,不管得到哪一个键的值,都需要从Block的第一个键开始依次构造,搜索一个键的时候,也需要遍历整个Block,如果一个Block里有大量的键的话,效率会比较低。
针对这个问题,LevelDB设置了restart point,每16个Kv里第一个Kv是一个restart point,这个Kv的shared_key_length始终为0,也就是这个Kv不采用前缀编码,non_shared_key_content里的内容就是整个键的内容。这样就不需要从每一个Data Block的开头开始构造键了,只需要从每一个restart point开始构造。另外在每个Data Block的末尾存储了一个restart point数组,指向了每一个restart point所在Kv的在块中的偏移,这样便可以支持二分搜索,搜索出键属于哪一个restart point的组里,然后去搜索这个组里面的16个Kv就可以找到这个键。restart point数组就像是一个Data Block的稀疏索引,可以加快键的查找。

1
2
3
4
5
struct DataBlock {
Kv[] kv; // Kv数组
int32[] restart_point_offsets; // restart point偏移数组,指向每一个restart point
int32 restart_point_count; // restart point数量
}

读取的逻辑如下

DecodeEntrygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 从 P 的位置开始读取
static inline const char* DecodeEntry(const char* p, const char* limit,
uint32_t* shared, uint32_t* non_shared,
uint32_t* value_length) {
if (limit - p < 3) return nullptr;
// shared_key 长度
*shared = reinterpret_cast<const uint8_t*>(p)[0];
// non_shared_key 长度
*non_shared = reinterpret_cast<const uint8_t*>(p)[1];
// value 长度
*value_length = reinterpret_cast<const uint8_t*>(p)[2];
if ((*shared | *non_shared | *value_length) < 128) {
// Fast path: all three values are encoded in one byte each
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr;
}

if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return nullptr;
}
return p;
}

Index Block

Index Block就非常简单了,它其实就是存储了一个Kv数组,每一个Kv对应一个Data Block,其中键大于等于对应的Data Block中最后一个键,值为一个BlockHandler,可以定位到一个Data BlockIndex Block就是Data Block的索引,搜索时可以对Index Block二分搜索,找到键对应的Data Block

作者注:这里不去讨论 Bloom 过滤器,主要侧重在 LSM 和 LevelDB 的本地逻辑。


有了上面这两部分的基础知识,我们就可以知道是如何从磁盘中读取一个值了。

  1. 读取 Footer,根据里面的读取 Meta Index BlockIndex Block ,将 Index Block 的内容缓存到内存中
  2. 根据键对 Index Blockrestart point 进行二分搜索,找到这个键对应的 Data BlockBlockHandler
  3. 读取对应的 Data Block
  4. Data Block 里的 restart point 进行二分搜索,找到搜索键对应的 restart point;

那我们来看看读取的全流程,我们从 Table::InternalGet 开始

InternalGetgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Status s;
// 找到 index block
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
// 定位到对应的 Data block 索引
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
// Bloom 过滤 SKIP
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
// 读取 Data Block
Iterator* block_iter = BlockReader(this, options, iiter->value());
// 二分查找 Data Block
block_iter->Seek(k);
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}

IteratorSeek 是一致的,因为数据结构我们分析是一样的,不过 IndexBlock 中的一个 Restart 对应了一个 KV,而 DataBlock 中会对应多个,最后一段的 Linear Search 对于两者也是一样的。

Seekgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void Seek(const Slice& target) override {
// left -> 0 , right -> restart pointer
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;

// 二分查找
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}

// Linear search (within restart block) for first key >= target
// 最终的线性查找,在某个 restart block 区间内
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}

New DB

opengithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;

DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;

// 首先第一步就是先 Recover
Status s = impl->Recover(&edit, &save_manifest);

// 下略
}

Read & Write

我们先看读写,再看其他的边缘 Case

Write

我们知道 Leveldb 收到消息第一步就是写 WAL

Putgithub
1
2
3
4
5
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

WriteBatch 看名字就知道是一个批量操作,用于将多个写入一起写入。 不过这个 WriteBatch 还是挺抽象的。

1
2
3
4
5
6
7
8
9
10
11
class LEVELDB_EXPORT WriteBatch {
public:
WriteBatch();
~WriteBatch();
private:
friend class WriteBatchInternal;

std::string rep_; // See comment in write_batch.cc for the format of rep_

// Intentionally copyable
};

对于 leveldb 里面只有一个 rep_ 来储存真正的内存,其他的内容都在按照一些特殊的方式进行处理的。

这里参考下 [LevelDB] 数据库3:循序渐进 —— 操作接口 中的拿过来,大概是这样的结构

所以看下面的操作的时候,比如生成序列号。

1
2
3
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}

Write 就是真实写入的函数了,将一个 WriteBatch 的内容写入到数据库,写入分为两步:

  • 写入数据到日志;
  • 写入数据到MemTable。
Writegithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
// 默认非 DONE
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);

// 这里有可能别的线程帮助写入了,就是 DONE了,所以下面有个 Return,并且自己是 Head就真的开始写入,不然就是等
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}

// May temporarily unlock and wait.
// MakeRoomForWrite 的主要逻辑是将 MemTable 变成 ImmuateMemTable, 然后创建一个新的 MemTable
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
// 这里的 last_writer 现在就是 Head
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// BuildBatchGroup 会根据队列里面的值尽量的多一次性取出来去 Write
// 在这里会更新 last_writer,也就是合并写入的最后一个
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
// 注意这一步解锁,很关键,因为接下来的写入可能是一个费时的过程,解锁后,其它线程可以Get,其它线程也可以继续将writer
// 插入到writers_里面,但是插入后,因为不是头元素,会等待,所以不会冲突
{
mutex_.Unlock();
// 写入log
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;

// 根据选项sync,不开就可以会丢数据
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);

//更新 SEQ,到这里就写完了。
}


// 通知下个等待的 Write 过来写。
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

对于写来说,流程还是比较简单的。

  1. 创建一个批量的 Writebatch
  2. 确保有足够的内存空间
  3. 写 Log & 写 内存

Read

通过各种网络小知识,我们知道 Leveldb 是一层层的读的,因此我们来看看 Get 是怎么工作的。

Getgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
// 读取快照版本,这个后面再解释
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

// 获取当前的 mem/imm 挨个去读
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
// mem ref +1 代表这个正在被使用,下同
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
// 读 Memtable -> IMM -> DISK
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

主逻辑非常的清晰,比较复杂的是读取硬盘上的 SST

SST 在代码中的表示是

FileMetagithub
1
2
3
4
5
6
7
8
9
10
11
12
13
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
};

// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];

因此我们只要根据 smallestlarget 就可以定位到对应的 SST 然后读进内存就好了,找到那个 Key 就好了。分层读取的逻辑在 ForEachOverlapping 就不展开了。

版本管理

在上面的 Read 中,我们看到了 Version 但是我们并没有深究。

那么为什么要版本管理呢? 在数据不断写入后,MemTable写满了,这时候就会转换为Level 0的一个SSTable,或者Level n的一个文件和Level n + 1的多个文件进行Compaction,会转换成Level n + 1的多个文件。这会使SSTable文件数量改变,文件内容改变,也就是版本信息改变了,所以需要管理版本。

  • Version标识了一个版本,主要信息是这个版本包含的 SSTable 信息

    1
    std::vector<FileMetaData*> files_[config::kNumLevels];
  • VersionSet是一个版本集,里面保存了Version的一个双链表,其中有一个Version是当前版本,因为只有一个实例,还保存了其它的一些全局的元数据, Dummy Version是链表头

1
VersionSet* vset_;  // VersionSet to which this Version belongs
  • VersionEdit保存了要对Version做的修改,在一个Version上应用一个VersionEdit,可以生成一个新的Version;
  • Builder是一个帮助类,帮助Version上应用VersionEdit,生成新版本。

运行流程

  • VersionSet 里保存着当前版本,以及被引用的历史版本;
  • 当有 Compaction 发生时,会将更改的内容写入到一个 VersionEdit 中;
  • 利用 BuilderVersionEdit 应用到当前版本上面生成一个新的版本;
  • 将新版本链接到 VersionSet 的双链表上面;
  • 将新的版本设置为当前版本;
  • 将旧的当前版本 Unref ,就是引用计数减 1。

VersionSet

那么我们看起来首先,对于某个版本的查询,核心的就是要一直读取对应时间的 IMM MMT SSTABLE ,这个做法就是就是在 VersionSet

VersionSetgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class VersionSet {
Env* const env_; // 封装部分操作系统调用,包括文件、线程操作等
const std::string dbname_; // 数据库名称,Open时传入
const Options* const options_; // 数据库选项,Open时传入
TableCache* const table_cache_; // 打开的SSTable的缓存,Open时创建
const InternalKeyComparator icmp_; // 根据User Key生成的Internal Key的Comparator
uint64_t next_file_number_; // ldb、log和MANIFEST生成新文件时都有一个序号单调递增
uint64_t manifest_file_number_; // 当前的MANIFEST的编号 [重点]
uint64_t last_sequence_; // 上一个使用的SequenceNumber
uint64_t log_number_; // 当前的日志的编号

WritableFile* descriptor_file_; // MANIFEST打开的文件描述符
log::Writer* descriptor_log_; // MANIFEST实际存储的格式是WAL日志的格式,所以这里用来写入数据
Version dummy_versions_; // Version链表的头结点
Version* current_; // 当前的Version

// 这是用来记录Compact的进度,Compact总是从某一Level的最小的键开始到某个键结束,
// 下次再从下一个键开始,所以这个就是下一次这个Level从哪个键开始Compact
std::string compact_pointer_[config::kNumLevels];
}

主要是通过 MANIFEST 来维护,也就是为什么我们的 MANIFEST 会有那么多的后缀了。

VersionEdit

VersionEdit 中记录了一次变更的内容。

VersionEditgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class VersionEdit {
typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

std::string comparator_; // 比较器的名称,持久化后,下次打开时需要对比一致
uint64_t log_number_; // 日志文件的编号
uint64_t next_file_number_; // ldb、log和MANIFEST下一个文件的编号
SequenceNumber last_sequence_; // 上一个使用的SequenceNumber
bool has_comparator_; // 记录上面4个字段是否存在,存在才会持久化的MANIFEST中
bool has_log_number_;
bool has_next_file_number_;
bool has_last_sequence_

// 和VersionSet里面的compact_pointers_相同
std::vector<std::pair<int, InternalKey>> compact_pointers_;
// 有哪些文件被删除,就是Version里哪些SSTable被删除
DeletedFileSet deleted_files_;
// 有哪些文件被增加,pair的第一个参数是Level,第二个参数是文件的元信息
std::vector<std::pair<int, FileMetaData>> new_files_;
};

VersionEdit + Version = NextVersion

VersionEdit 包含集中类型数据

1
2
3
4
5
6
7
8
9
10
11
enum Tag {
kComparator = 1, // 记录Comparator的名字
kLogNumber = 2, // 记录当前时刻的log_number
kNextFileNumber = 3, // 记录当前时刻的next_file_number_
kLastSequence = 4, // 记录当前时刻的last_sequence
kCompactPointer = 5, // 记录compact_pointer
kDeletedFile = 6, // 记录删除的文件信息
kNewFile = 7 // 记录新增的文件信息
// 8 was used for large value refs
kPrevLogNumber = 9
};

而这些内容都会写到 MANIFEST 之中,在最初的启动逻辑中(以已有的 MANIFEST,恢复需要创建一个新的),我们从下面的代码就会看到 edit 会负责把记录写到 MANIFEST

WriteSnapshotgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?

// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());

// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++) {
if (!compact_pointer_[level].empty()) {
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
edit.SetCompactPointer(level, key);
}
}

// Save files
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
}
}

std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
}

Builder

为了完成更新任务,还需要一个辅助类 Builder

对于 Builder 最重要的工作就是 下面的 Apply

Applygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Apply(const VersionEdit* edit) {
// 更新 压缩指针
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}

// 删除不存在的文件
for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}

// 增加新增的文件
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
SaveTogithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void Builder::SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// 拿出原本Version里的文件,以及Builder里累积的,添加的文件
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
// 按顺序进行合并
for (const auto& added_file : *added_files) {
// 找到base里面比added_file小的文件,添加到新的Version里
// 采用MaybeAddFile,让被删除的文件无法添加
for (std::vector<FileMetaData*>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

MaybeAddFile(v, level, added_file);
}

// 添加剩下的文件
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
}
}

这里显然有一个问题,随着系统的长时间运行,我们的 MANIFEST 显然会越来越大,随着时间的流逝,早期的版本是没有意义的,我们没必要还原所有的版本的情况,我们只需要还原还活着的版本的信息。MANIFEST只有一个机会变小,抛弃早期过时的VersionEdit,给当前的VersionSet来个快照,然后从新的起点开始累加VerisonEdit。这个机会就是重新开启DB。在后续的 Recover 里面再聊聊

流程

版本变迁的逻辑主要是在 VersionSet::LogAndApply

LogAndApplygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// 设定当前的log number
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}

// 设定当前的next_file_number和last_sequence,这些都会被持久化到MANIFEST
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

// 创建一个新版本,新版本是current_和edit的结合
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);

// 写入比较吃 磁盘的 MANIFEST 日志
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
// 在这里写入文件
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// 这里更新了 Current 文件,指向对应的 MANIFEST
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

// 安装新版本,会把v放到VersionSet的链表中,然后将当前Version指向v
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}

return s;
}

其实从流程看还是比较的清晰的,在当前的 VersionSet 上,通过 VersionEditBuilder 创建一个新的 Version 并且更新 MANIFEST 就是更新的事情,那问题又随之到来,那 VersionEdit 是怎么创建出来的呢?这里就涉及到我们下面的那个知识点了。

Compaction

  1. MemTable 写满后,需要将 MemTable 的数据写入磁盘,生成一个 Level 0SSTable
  2. Level 0SSTable 的键范围可能有重叠
  3. CompactionLevel 0SSTable 推向 Level 1,使得 Level 0SSTable 数量保持在一个较低的水平;
  4. Level 1 的文件数量可能太多,导致一次 Compaction 消耗太多磁盘 IO ,所以需要将 Level 1 的文件继续 Compaction 到更高 Level 去。

  • Level0 -> Level1: 因为 Key 可能重叠,采用多路归并,最终 Level 1 的文件依然是有序并且无重叠的
  • Level n -> Level n + 1: 选择一个文件进行Compaction时,不可能有其它同层的文件有重叠,所以只需要一个文件即可,然后选择Level n + 1和Level n有重叠的文件,后面的步骤都是一样的。

Compaction 触发条件

Size Compaction

按照 Size 计算每一 Level 实际大小相对于最大大小的比率,优先 Compaction 比率最大的Level。

Finalizegithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1; // 最大的比率的Level
double best_score = -1; // 最大的比率

for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
if (level == 0) {
// Level 0特殊处理,使用文件的个数,而不是大小来确定比率,因为对于大的writer-buffer
// Level 0的文件会更大,这时候如果限定总大小,Compaction会偏多
// 对于小的Level 0文件,数量会太多,影响读取的速度
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// 计算文件总大小相对于最大大小的比率
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

// 将计算得到的最大的score和level赋值,后台线程看到赋值后会开始Compaction
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

Seek Compaction

TODO

Compaction实现

Compactionlink
1
2
3
4
5
6
7
8
9
class Compaction {
...
int level_; // Compaction文件所在的Level
uint64_t max_output_file_size_; // 生成的文件的最大值
Version* input_version_; // Compaction发生时的Version
VersionEdit edit_; // Compaction结果保存的VersionEdit

std::vector<FileMetaData*> inputs_[2];
}

显然 inputs 最多涉及到 levellevel+1,具体的逻辑在 DoCompactionWork

DoCompactionWorkgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions

// 取当前最小的使用中的SequenceNumber
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}

// 参与Compaction的SSTable组成一个迭代器
// MakeInputIterator 代码比较简单就不看了, 这里值得主要的就是 如果 Level0 就得把所有的 Level0 文件都带上···
Iterator* input = versions_->MakeInputIterator(compact->compaction);

// Release mutex while we're actually doing the compaction work
mutex_.Unlock();

input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key; // 记录当前的User Key
bool has_current_user_key = false; // 记录是否碰到过一个同样的User Key
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// Prioritize immutable compaction work
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
// 如果还有 imm 的存在,先处理 imm 到 sstable
CompactMemTable();
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

Slice key = input->key();
// 当前输出文件的构建是否应该在指定key之前停下,若到达指定key时当前输出文件与level+2层的文件重合的内容数量超出阈值,则新起一个输出文件。
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
// 创建一个新的SSTable,写入文件
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}

// 这里要判断这个 Key 是不是要处理,用 drop 来标记
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// 第一次遇见 user key。标记了下当前处理的一些状态
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}

// 如果上一个Key的SequenceNumber <= 最小的存活的Snapshot,这个Key就不会被任何线程看到了,可以被丢弃
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// 如果碰到了一个删除操作,并且SequenceNumber <= 最小的Snapshot,
// 通过IsBaseLevelForKey判断更高Level不会有这个User Key存在,那么这个Key就被丢弃
drop = true;
}

last_sequence_for_key = ikey.sequence;
}

if (!drop) {
// Open output file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());

// 达到文件大小,就写入文件,生成新文件
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}

input->Next();
}


if (status.ok()) {
// 创建压缩的结果对象,下解释
status = InstallCompactionResults(compact);
}
return status;
}

// 将变更的记录都放到 edit 里面,然后触发 LogAndApply
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}

Recover

  • 如果数据库目录不存在,创建目录;
  • 加文件锁,锁住整个数据库;
  • 读取 MANIFEST 文件,恢复系统关闭时的元数据,也就是版本信息,或者新建MAINFEST文件;
  • 如果上一次关闭时,MemTable 里有数据,或者 Immutable MemTable 写入到 SSTable 未完成,那么需要做数据恢复,从 WAL 恢复数据;
  • 创建数据库相关的内存数据结构,如 VersionVersionSet 等。
Recovergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();

// 创建数据库目录
env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
// 文件锁,禁止多进程的操作
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}

// 干净的就创建一个单独的 DB,不然就 恢复
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(dbname_,
"exists (error_if_exists is true)");
}
}
// 从 Manifest 恢复最后的版本
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
SequenceNumber max_sequence(0);

// 之前的MANIFEST恢复,会得到版本信息,里面包含了之前的log number
// 搜索文件系统里的log,如果这些日志的编号 >= 这个log number,那么这些
// 日志都是关闭时丢失的数据,需要恢复,这里将日志按顺序存储在logs里面
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}

// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

对于系统来说比较重要的恢复显然是 Recover 的逻辑

Recovergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
Status VersionSet::Recover(bool* save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};

// 读取CURRENT文件的内容,获取当前使用的MANIFEST文件
// 读取MANIFEST文件,将里面的VersionEdit读取应用到一个builder里
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size() - 1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);

std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
if (s.IsNotFound()) {
return Status::Corruption("CURRENT points to a non-existent file",
s.ToString());
}
return s;
}

bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
int read_records = 0;

{
// Record 挨个的读取
}
delete file;
file = nullptr;

if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}

if (!have_prev_log_number) {
prev_log_number = 0;
}

MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}

if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;

// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
} else {
std::string error = s.ToString();
Log(options_->info_log, "Error recovering version set with %d records: %s",
read_records, error.c_str());
}

return s;
}

参考 & 推荐阅读