Rust Project 3: Synchronous client-server networking 解读

NciWO.png

上次讲完了我们单机模式下的 KV Store,这次接着解读,我们将访问模式从 CMD 变成 Networking 的模式。

本文主要解读代码的构成部分,对于类库的使用就一笔带过。

服务端

1
2
3
4
5
6
7
8
9
10
fn main() {
// 设置日志级别
env_logger::builder().filter_level(LevelFilter::Info).init();
// 读取配置
let mut opt = Opt::from_args();
// current_engine() 是说 current_kv_engine 也就是当前目录
let res = current_engine().and_then(move |curr_engine| {
run(opt)
});
}

启动

显然所有的逻辑都在我们的 run 之中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fn run(opt: Opt) -> Result<()> {
let engine = opt.engine.unwrap_or(DEFAULT_ENGINE);
info!("kvs-server {}", env!("CARGO_PKG_VERSION"));
info!("Storage engine: {}", engine);
info!("Listening on {}", opt.addr);

// write engine to engine file
fs::write(current_dir()?.join("engine"), format!("{}", engine))?;

match engine {
Engine::kvs => run_with_engine(KvStore::open(current_dir()?)?, opt.addr),
Engine::sled => run_with_engine(
SledKvsEngine::new(sled::Db::start_default(current_dir()?)?),
opt.addr,
),
}
}

从这里我们的代码开始分叉,我们需要处理两种情况,sledkvs,不过其实 sled 仅仅是一个测试的 Mock 实现,从 sled.rs 的实现可以看出来,仅仅是一个 IN-MEM 的实现罢了,我们还是核心要去看 Engine::kvsCASE(不过其实也没啥区别,就是换了 kv 引擎)

来到了我们 Run 服务器的地方了

runsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pub fn run<A: ToSocketAddrs>(mut self, addr: A) -> Result<()> {

// 仅仅打开了一个监听的 Socket
let listener = TcpListener::bind(addr)?;
for stream in listener.incoming() {
match stream {
Ok(stream) => {
if let Err(e) = self.serve(stream) {
error!("Error on serving client: {}", e);
}
}
}
}
Ok(())
}

逻辑处理

核心的逻辑依然需要往下走一点,我们就来到了有趣的部分了,几乎就是如何手写一个纯粹的 Web 服务。

servesource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
fn serve(&mut self, tcp: TcpStream) -> Result<()> {
let peer_addr = tcp.peer_addr()?;

// 使用 BufReader 来读取数据,标准的装饰器模式……
let reader = BufReader::new(&tcp);
let mut writer = BufWriter::new(&tcp);
// 读取的数据解析成 Request 类型
let req_reader = Deserializer::from_reader(reader).into_iter::<Request>();

// Rust 宏,这个宏很简单,一个表达式参数,将 resp 对象写入 writer
macro_rules! send_resp {
($resp:expr) => {{
let resp = $resp;
serde_json::to_writer(&mut writer, &resp)?;
writer.flush()?;
debug!("Response sent to {}: {:?}", peer_addr, resp);
};};
}

// 注意这里其实是 Blocking IO,我们每一次都需要等待完整的 Request 达到进入循环体内
for req in req_reader {
let req = req?;
debug!("Receive request from {}: {:?}", peer_addr, req);
// 处理不同的数据类型即可
// 处理逻辑实则就是我们 kv store 的三个操作
match req {
Request::Get { key } => send_resp!(match self.engine.get(key) {
Ok(value) => GetResponse::Ok(value),
Err(e) => GetResponse::Err(format!("{}", e)),
}),
Request::Set { key, value } => send_resp!(match self.engine.set(key, value) {
Ok(_) => SetResponse::Ok(()),
Err(e) => SetResponse::Err(format!("{}", e)),
}),
Request::Remove { key } => send_resp!(match self.engine.remove(key) {
Ok(_) => RemoveResponse::Ok(()),
Err(e) => RemoveResponse::Err(format!("{}", e)),
}),
};
}
Ok(())
}

客户端

客户端的代码和服务端类似,不过显然我仅仅发送网络请求即可。

client runsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fn run(opt: Opt) -> Result<()> {
match opt.command {
Command::Get { key, addr } => {
let mut client = KvsClient::connect(addr)?;
if let Some(value) = client.get(key)? {
println!("{}", value);
} else {
println!("Key not found");
}
}
Command::Set { key, value, addr } => {
let mut client = KvsClient::connect(addr)?;
client.set(key, value)?;
}
Command::Remove { key, addr } => {
let mut client = KvsClient::connect(addr)?;
client.remove(key)?;
}
}
Ok(())
}

发送逻辑显然也很简单

clientsource code
1
2
3
4
5
6
7
8
9
10
11
pub fn get(&mut self, key: String) -> Result<Option<String>> {
// 将 Get 请求,序列化之后,写入 Writer
serde_json::to_writer(&mut self.writer, &Request::Get { key })?;
self.writer.flush()?;
// 将返回的结果反序列出来
let resp = GetResponse::deserialize(&mut self.reader)?;
match resp {
GetResponse::Ok(value) => Ok(value),
GetResponse::Err(msg) => Err(KvsError::StringError(msg)),
}
}