PingCAP Talent Plan
是很不错的入门教程,今天我们来分析下 Project 2
的项目代码。
本章的目标
处理错误和异常
使用 serde
进行序列化
使用标准的 API 进行数据的读写
从磁盘上读取 KV
在内存中维护 Indexs
压缩数据
代码地址
定义 KvStore
def kv 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 pub struct KvStore { path: PathBuf, readers: HashMap<u64 , BufReaderWithPos<File>>, current_gen: u64 , writer: BufWriterWithPos<File>, index: BTreeMap<String , CommandPos>, uncompacted: u64 , } struct CommandPos { gen: u64 , pos: u64 , len: u64 , }
数据储存的形式是
1 2 3 4 ├── data │ ├── 1.log │ ├── 2.log │ └── 3.log
对于 readers
中最终的储存数据为
1 2 3 1 -> File(1.log) 2 -> File(2.log) 3 -> File(3.log)
而每一个 log
文件内部的格式如下:
因此每一个 CommandPos
内标记的是 1. 隶属于某个 Log
文件, 2. Log
文件中的起始 Pos, 3. Command
的长度。
初始化 KvStore
初始化的工作,我们需要做的事情,显然就是将当前目录下所有的 *.log
文件都载入内存中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 pub fn open (path: impl Into <PathBuf>) -> Result <KvStore> { let mut readers = HashMap::new(); let mut index = BTreeMap::new(); let gen_list = sorted_gen_list(&path)?; let mut uncompacted = 0 ; for &gen in &gen_list { let mut reader = BufReaderWithPos::new(File::open(log_path(&path, gen))?)?; uncompacted += load(gen, &mut reader, &mut index)?; readers.insert(gen, reader); } let current_gen = gen_list.last().unwrap_or(&0 ) + 1 ; let writer = new_log_file(&path, current_gen, &mut readers)?; }
而更为核心的内容在 load
中,load
的功能就是将目录中的数据载入系统中来。
load 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 fn load ( gen: u64 , reader: &mut BufReaderWithPos<File>, index: &mut BTreeMap<String , CommandPos>, ) -> Result <u64 > { let mut pos = reader.seek(SeekFrom::Start(0 ))?; let mut stream = Deserializer::from_reader(reader).into_iter::<Command>(); let mut uncompacted = 0 ; while let Some (cmd) = stream.next() { let new_pos = stream.byte_offset() as u64 ; match cmd? { Command::Set { key, .. } => { if let Some (old_cmd) = index.insert(key, (gen, pos..new_pos).into()) { uncompacted += old_cmd.len; } } Command::Remove { key } => { if let Some (old_cmd) = index.remove(&key) { uncompacted += old_cmd.len; } uncompacted += new_pos - pos; } } pos = new_pos; } Ok (uncompacted) }
获得数据
获得数据是很快的,因为我们已经在内存中储存了 Index
的缓存。
get key 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 pub fn get (&mut self , key: String ) -> Result <Option <String >> { if let Some (cmd_pos) = self .index.get(&key) { let reader = self .readers .get_mut(&cmd_pos.gen) .expect("Cannot find log reader" ); reader.seek(SeekFrom::Start(cmd_pos.pos))?; let cmd_reader = reader.take(cmd_pos.len); if let Command::Set { value, .. } = serde_json::from_reader(cmd_reader)? { Ok (Some (value)) } else { Err (KvsError::UnexpectedCommandType) } } else { Ok (None ) } }
增加数据
set key value 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 pub fn set (&mut self , key: String , value: String ) -> Result <()> { let cmd = Command::set(key, value); let pos = self .writer.pos; serde_json::to_writer(&mut self .writer, &cmd)?; self .writer.flush()?; if let Command::Set { key, .. } = cmd { if let Some (old_cmd) = self .index .insert(key, (self .current_gen, pos..self .writer.pos).into()) { self .uncompacted += old_cmd.len; } } if self .uncompacted > COMPACTION_THRESHOLD { self .compact()?; } Ok (()) }
数据压缩
对于我们不再使用的数据,我们可以进行压缩
显然,我们需要做的事情也很明确,从我们的 Index
扫描保存在用的 Command
,而其他的都可以删除了。
compact 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 pub fn compact (&mut self ) -> Result <()> { let compaction_gen = self .current_gen + 1 ; self .current_gen += 2 ; self .writer = self .new_log_file(self .current_gen)?; let mut compaction_writer = self .new_log_file(compaction_gen)?; let mut new_pos = 0 ; for cmd_pos in &mut self .index.values_mut() { let reader = self .readers.get_mut(&cmd_pos.gen).expect("Cannot find log reader" ); if reader.pos != cmd_pos.pos { reader.seek(SeekFrom::Start(cmd_pos.pos))?; } let mut entry_reader = reader.take(cmd_pos.len); let len = io::copy(&mut entry_reader, &mut compaction_writer)?; *cmd_pos = (compaction_gen, new_pos..new_pos + len).into(); new_pos += len; } compaction_writer.flush()?; let stale_gens: Vec <_> = self .readers.keys() .filter(|&&gen| gen < compaction_gen).cloned().collect(); for stale_gen in stale_gens { self .readers.remove(&stale_gen); fs::remove_file(log_path(&self .path, stale_gen))?; } self .uncompacted = 0 ; Ok (()) }
删除数据
删除数据非常的简单,和 Set
相同,可以看作 Set Null
,因此直接 Append Log
即可。
remove 1 2 3 4 5 6 7 8 9 10 11 12 13 14 pub fn remove (&mut self , key: String ) -> Result <()> { if self .index.contains_key(&key) { let cmd = Command::remove(key); serde_json::to_writer(&mut self .writer, &cmd)?; self .writer.flush()?; if let Command::Remove { key } = cmd { let old_cmd = self .index.remove(&key).expect("key not found" ); self .uncompacted += old_cmd.len; } Ok (()) } else { Err (KvsError::KeyNotFound) } }