
上次讲完了我们单机模式下的 KV Store
,这次接着解读,我们将访问模式从 CMD
变成 Networking
的模式。
本文主要解读代码的构成部分,对于类库的使用就一笔带过。
服务端
1 2 3 4 5 6 7 8 9 10
| fn main() { env_logger::builder().filter_level(LevelFilter::Info).init(); let mut opt = Opt::from_args(); let res = current_engine().and_then(move |curr_engine| { run(opt) }); }
|
启动
显然所有的逻辑都在我们的 run
之中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| fn run(opt: Opt) -> Result<()> { let engine = opt.engine.unwrap_or(DEFAULT_ENGINE); info!("kvs-server {}", env!("CARGO_PKG_VERSION")); info!("Storage engine: {}", engine); info!("Listening on {}", opt.addr);
fs::write(current_dir()?.join("engine"), format!("{}", engine))?;
match engine { Engine::kvs => run_with_engine(KvStore::open(current_dir()?)?, opt.addr), Engine::sled => run_with_engine( SledKvsEngine::new(sled::Db::start_default(current_dir()?)?), opt.addr, ), } }
|
从这里我们的代码开始分叉,我们需要处理两种情况,sled
和 kvs
,不过其实 sled
仅仅是一个测试的 Mock
实现,从 sled.rs 的实现可以看出来,仅仅是一个 IN-MEM
的实现罢了,我们还是核心要去看 Engine::kvs
的 CASE
(不过其实也没啥区别,就是换了 kv 引擎)
来到了我们 Run
服务器的地方了
runsource code1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| pub fn run<A: ToSocketAddrs>(mut self, addr: A) -> Result<()> {
let listener = TcpListener::bind(addr)?; for stream in listener.incoming() { match stream { Ok(stream) => { if let Err(e) = self.serve(stream) { error!("Error on serving client: {}", e); } } } } Ok(()) }
|
逻辑处理
核心的逻辑依然需要往下走一点,我们就来到了有趣的部分了,几乎就是如何手写一个纯粹的 Web
服务。
servesource code1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| fn serve(&mut self, tcp: TcpStream) -> Result<()> { let peer_addr = tcp.peer_addr()?;
let reader = BufReader::new(&tcp); let mut writer = BufWriter::new(&tcp); let req_reader = Deserializer::from_reader(reader).into_iter::<Request>();
macro_rules! send_resp { ($resp:expr) => {{ let resp = $resp; serde_json::to_writer(&mut writer, &resp)?; writer.flush()?; debug!("Response sent to {}: {:?}", peer_addr, resp); };}; }
for req in req_reader { let req = req?; debug!("Receive request from {}: {:?}", peer_addr, req); match req { Request::Get { key } => send_resp!(match self.engine.get(key) { Ok(value) => GetResponse::Ok(value), Err(e) => GetResponse::Err(format!("{}", e)), }), Request::Set { key, value } => send_resp!(match self.engine.set(key, value) { Ok(_) => SetResponse::Ok(()), Err(e) => SetResponse::Err(format!("{}", e)), }), Request::Remove { key } => send_resp!(match self.engine.remove(key) { Ok(_) => RemoveResponse::Ok(()), Err(e) => RemoveResponse::Err(format!("{}", e)), }), }; } Ok(()) }
|
客户端
客户端的代码和服务端类似,不过显然我仅仅发送网络请求即可。
client runsource code1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| fn run(opt: Opt) -> Result<()> { match opt.command { Command::Get { key, addr } => { let mut client = KvsClient::connect(addr)?; if let Some(value) = client.get(key)? { println!("{}", value); } else { println!("Key not found"); } } Command::Set { key, value, addr } => { let mut client = KvsClient::connect(addr)?; client.set(key, value)?; } Command::Remove { key, addr } => { let mut client = KvsClient::connect(addr)?; client.remove(key)?; } } Ok(()) }
|
发送逻辑显然也很简单
clientsource code1 2 3 4 5 6 7 8 9 10 11
| pub fn get(&mut self, key: String) -> Result<Option<String>> { serde_json::to_writer(&mut self.writer, &Request::Get { key })?; self.writer.flush()?; let resp = GetResponse::deserialize(&mut self.reader)?; match resp { GetResponse::Ok(value) => Ok(value), GetResponse::Err(msg) => Err(KvsError::StringError(msg)), } }
|