0%

Rust Project 2: Log-structured file I/O 解读

txTC8.png

PingCAP Talent Plan 是很不错的入门教程,今天我们来分析下 Project 2 的项目代码。

本章的目标

  • 处理错误和异常
  • 使用 serde 进行序列化
  • 使用标准的 API 进行数据的读写
  • 从磁盘上读取 KV
  • 在内存中维护 Indexs
  • 压缩数据

代码地址

定义 KvStore

def kv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pub struct KvStore {
// 储存数据的目录,文件的序号从 0 开始, 1,2,3,4 依次递增
path: PathBuf,
// Key 是文件序号,Value 是读取的文件
readers: HashMap<u64, BufReaderWithPos<File>>,
// 当前文件的序号
current_gen: u64,
// 当前文件的写入的 POS
writer: BufWriterWithPos<File>,
// Key 是储存的Key,Value 是 CommandPos
index: BTreeMap<String, CommandPos>,
// 累计的 Log 条数
uncompacted: u64,
}

struct CommandPos {
gen: u64,
pos: u64,
len: u64,
}

数据储存的形式是

1
2
3
4
├── data
│   ├── 1.log
│   ├── 2.log
│   └── 3.log

对于 readers 中最终的储存数据为

1
2
3
1 -> File(1.log)
2 -> File(2.log)
3 -> File(3.log)

而每一个 log 文件内部的格式如下:

log.file

因此每一个 CommandPos 内标记的是 1. 隶属于某个 Log 文件, 2. Log 文件中的起始 Pos, 3. Command 的长度。

初始化 KvStore

初始化的工作,我们需要做的事情,显然就是将当前目录下所有的 *.log 文件都载入内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
let mut readers = HashMap::new();
let mut index = BTreeMap::new();

// 将目录下的文件按照从小到大的排序载入,vec![1,2,3,6,9]
let gen_list = sorted_gen_list(&path)?;
let mut uncompacted = 0;

// 按照顺序挨个读取 log 文件
for &gen in &gen_list {
let mut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?;
uncompacted += load(gen, &mut reader, &mut index)?;
readers.insert(gen, reader);
}

// 以最大的 log 序号 + 1
let current_gen = gen_list.last().unwrap_or(&0) + 1;
// 以 current_gen 创建一个新文件
let writer = new_log_file(&path, current_gen, &mut readers)?;
}

而更为核心的内容在 load 中,load 的功能就是将目录中的数据载入系统中来。

load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fn load(
gen: u64,
reader: &mut BufReaderWithPos<File>,
index: &mut BTreeMap<String, CommandPos>,
) -> Result<u64> {
// 置位 0,从零开始
let mut pos = reader.seek(SeekFrom::Start(0))?;
let mut stream = Deserializer::from_reader(reader).into_iter::<Command>();
let mut uncompacted = 0; // Log 条目的计数器
// 读取一条从 log 中,判断不同的类型进行处理
while let Some(cmd) = stream.next() {
let new_pos = stream.byte_offset() as u64;
match cmd? {
Command::Set { key, .. } => {
// 如果是 set, 我们在 index 中插入 Key, 和定位的 POS
if let Some(old_cmd) = index.insert(key, (gen, pos..new_pos).into()) {
uncompacted += old_cmd.len;
}
}
Command::Remove { key } => {
// 删除一个 key
if let Some(old_cmd) = index.remove(&key) {
uncompacted += old_cmd.len;
}
uncompacted += new_pos - pos;
}
}
pos = new_pos;
}
Ok(uncompacted)
}

获得数据

获得数据是很快的,因为我们已经在内存中储存了 Index 的缓存。

txZAr.png

get key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
pub fn get(&mut self, key: String) -> Result<Option<String>> {
if let Some(cmd_pos) = self.index.get(&key) {
// 获得了 command_pos 之后就去具体的文件中读取
let reader = self
.readers
.get_mut(&cmd_pos.gen)
.expect("Cannot find log reader");
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
let cmd_reader = reader.take(cmd_pos.len);
if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? {
Ok(Some(value))
} else {
Err(KvsError::UnexpectedCommandType)
}
} else {
// index 无key,直接返回了
Ok(None)
}
}

增加数据

set key value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub fn set(&mut self, key: String, value: String) -> Result<()> {
// 创建一个 Command 对象
let cmd = Command::set(key, value);
// 当前 Log 文件的写入偏移位置
let pos = self.writer.pos;
// 写入文件
serde_json::to_writer(&mut self.writer, &cmd)?;
// 刷新到磁盘
self.writer.flush()?;
if let Command::Set { key, .. } = cmd {
// 更新到 Index 中
if let Some(old_cmd) = self
.index
.insert(key, (self.current_gen, pos..self.writer.pos).into())
{
self.uncompacted += old_cmd.len;
}
}

// 如果未压缩的超过 COMPACTION_THRESHOLD 就触发压缩
if self.uncompacted > COMPACTION_THRESHOLD {
self.compact()?;
}
Ok(())
}

数据压缩

对于我们不再使用的数据,我们可以进行压缩

tOquG.png

显然,我们需要做的事情也很明确,从我们的 Index 扫描保存在用的 Command,而其他的都可以删除了。

compact
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
pub fn compact(&mut self) -> Result<()> {
// 压缩后的 log 的序号是当前序号 + 1
let compaction_gen = self.current_gen + 1;

// 下一次写入的是 log 序号是当前序号 + 2
self.current_gen += 2;
self.writer = self.new_log_file(self.current_gen)?;

// 创建处理压缩结果的文件
let mut compaction_writer = self.new_log_file(compaction_gen)?;
let mut new_pos = 0;

// 从 index 取出所有的Value
for cmd_pos in &mut self.index.values_mut() {
let reader = self.readers.get_mut(&cmd_pos.gen).expect("Cannot find log reader");
if reader.pos != cmd_pos.pos {
reader.seek(SeekFrom::Start(cmd_pos.pos))?;
}

let mut entry_reader = reader.take(cmd_pos.len);
// 当前 Value 写入压缩 Log 文件
let len = io::copy(&mut entry_reader, &mut compaction_writer)?;
*cmd_pos = (compaction_gen, new_pos..new_pos + len).into();
new_pos += len;
}

// 写入磁盘
compaction_writer.flush()?;

// 删掉比 压缩Log 序号更小的 log,因为已经被压缩完了。
let stale_gens: Vec<_> = self.readers.keys()
.filter(|&&gen| gen < compaction_gen).cloned().collect();
for stale_gen in stale_gens {
self.readers.remove(&stale_gen);
fs::remove_file(log_path(&self.path, stale_gen))?;
}
self.uncompacted = 0;

Ok(())
}

删除数据

删除数据非常的简单,和 Set 相同,可以看作 Set Null,因此直接 Append Log 即可。

remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
pub fn remove(&mut self, key: String) -> Result<()> {
if self.index.contains_key(&key) {
let cmd = Command::remove(key);
serde_json::to_writer(&mut self.writer, &cmd)?;
self.writer.flush()?;
if let Command::Remove { key } = cmd {
let old_cmd = self.index.remove(&key).expect("key not found");
self.uncompacted += old_cmd.len;
}
Ok(())
} else {
Err(KvsError::KeyNotFound)
}
}
来杯奶茶, 嗝~~~