
PingCAP Talent Plan
是很不错的入门教程,今天我们来分析下 Project 2
的项目代码。
本章的目标
- 处理错误和异常
- 使用
serde
进行序列化
- 使用标准的 API 进行数据的读写
- 从磁盘上读取
KV
- 在内存中维护
Indexs
- 压缩数据
代码地址
定义 KvStore
def kv1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| pub struct KvStore { path: PathBuf, readers: HashMap<u64, BufReaderWithPos<File>>, current_gen: u64, writer: BufWriterWithPos<File>, index: BTreeMap<String, CommandPos>, uncompacted: u64, }
struct CommandPos { gen: u64, pos: u64, len: u64, }
|
数据储存的形式是
1 2 3 4
| ├── data │ ├── 1.log │ ├── 2.log │ └── 3.log
|
对于 readers
中最终的储存数据为
1 2 3
| 1 -> File(1.log) 2 -> File(2.log) 3 -> File(3.log)
|
而每一个 log
文件内部的格式如下:

因此每一个 CommandPos
内标记的是 1. 隶属于某个 Log
文件, 2. Log
文件中的起始 Pos, 3. Command
的长度。
初始化 KvStore
初始化的工作,我们需要做的事情,显然就是将当前目录下所有的 *.log
文件都载入内存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> { let mut readers = HashMap::new(); let mut index = BTreeMap::new();
let gen_list = sorted_gen_list(&path)?; let mut uncompacted = 0;
for &gen in &gen_list { let mut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?; uncompacted += load(gen, &mut reader, &mut index)?; readers.insert(gen, reader); }
let current_gen = gen_list.last().unwrap_or(&0) + 1; let writer = new_log_file(&path, current_gen, &mut readers)?; }
|
而更为核心的内容在 load
中,load
的功能就是将目录中的数据载入系统中来。
load1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| fn load( gen: u64, reader: &mut BufReaderWithPos<File>, index: &mut BTreeMap<String, CommandPos>, ) -> Result<u64> { let mut pos = reader.seek(SeekFrom::Start(0))?; let mut stream = Deserializer::from_reader(reader).into_iter::<Command>(); let mut uncompacted = 0; while let Some(cmd) = stream.next() { let new_pos = stream.byte_offset() as u64; match cmd? { Command::Set { key, .. } => { if let Some(old_cmd) = index.insert(key, (gen, pos..new_pos).into()) { uncompacted += old_cmd.len; } } Command::Remove { key } => { if let Some(old_cmd) = index.remove(&key) { uncompacted += old_cmd.len; } uncompacted += new_pos - pos; } } pos = new_pos; } Ok(uncompacted) }
|
获得数据
获得数据是很快的,因为我们已经在内存中储存了 Index
的缓存。

get key1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| pub fn get(&mut self, key: String) -> Result<Option<String>> { if let Some(cmd_pos) = self.index.get(&key) { let reader = self .readers .get_mut(&cmd_pos.gen) .expect("Cannot find log reader"); reader.seek(SeekFrom::Start(cmd_pos.pos))?; let cmd_reader = reader.take(cmd_pos.len); if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? { Ok(Some(value)) } else { Err(KvsError::UnexpectedCommandType) } } else { Ok(None) } }
|
增加数据
set key value1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| pub fn set(&mut self, key: String, value: String) -> Result<()> { let cmd = Command::set(key, value); let pos = self.writer.pos; serde_json::to_writer(&mut self.writer, &cmd)?; self.writer.flush()?; if let Command::Set { key, .. } = cmd { if let Some(old_cmd) = self .index .insert(key, (self.current_gen, pos..self.writer.pos).into()) { self.uncompacted += old_cmd.len; } }
if self.uncompacted > COMPACTION_THRESHOLD { self.compact()?; } Ok(()) }
|
数据压缩
对于我们不再使用的数据,我们可以进行压缩

显然,我们需要做的事情也很明确,从我们的 Index
扫描保存在用的 Command
,而其他的都可以删除了。
compact1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| pub fn compact(&mut self) -> Result<()> { let compaction_gen = self.current_gen + 1;
self.current_gen += 2; self.writer = self.new_log_file(self.current_gen)?;
let mut compaction_writer = self.new_log_file(compaction_gen)?; let mut new_pos = 0;
for cmd_pos in &mut self.index.values_mut() { let reader = self.readers.get_mut(&cmd_pos.gen).expect("Cannot find log reader"); if reader.pos != cmd_pos.pos { reader.seek(SeekFrom::Start(cmd_pos.pos))?; }
let mut entry_reader = reader.take(cmd_pos.len); let len = io::copy(&mut entry_reader, &mut compaction_writer)?; *cmd_pos = (compaction_gen, new_pos..new_pos + len).into(); new_pos += len; }
compaction_writer.flush()?;
let stale_gens: Vec<_> = self.readers.keys() .filter(|&&gen| gen < compaction_gen).cloned().collect(); for stale_gen in stale_gens { self.readers.remove(&stale_gen); fs::remove_file(log_path(&self.path, stale_gen))?; } self.uncompacted = 0;
Ok(()) }
|
删除数据
删除数据非常的简单,和 Set
相同,可以看作 Set Null
,因此直接 Append Log
即可。
remove1 2 3 4 5 6 7 8 9 10 11 12 13 14
| pub fn remove(&mut self, key: String) -> Result<()> { if self.index.contains_key(&key) { let cmd = Command::remove(key); serde_json::to_writer(&mut self.writer, &cmd)?; self.writer.flush()?; if let Command::Remove { key } = cmd { let old_cmd = self.index.remove(&key).expect("key not found"); self.uncompacted += old_cmd.len; } Ok(()) } else { Err(KvsError::KeyNotFound) } }
|