解密 Rust 异步编程

1E5E4.png

Future

Rust 在实现异步编程的时候,采用 Poll 模型。最为核心的就是

1
2
3
4
enum Poll<T> {
Ready(T),
Pending,
}

我们定义了一个类型为 Poll 处于两种状态之一 Ready Pending

  • Ready: 我们已经完成了任务可以返回结果 T
  • Pending:此时我们等待一些其他依赖,这时候我们需要将自己占用的资源释放出来,基于一个 wake() 函数再次将自己唤醒

最常见的就是 Socket 编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub struct SocketRead<'a> {
socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>; //返回值是一个 u8 的 bytes

fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// 如果有数据直接返回 Ready 状态下可读的值
Poll::Ready(self.socket.read_buf())
} else {
// 没有数据就直接返回 Pending,等待数据,这里有一个 wake 函数等待被唤醒
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}

对于多个 Pollable 的对象,组合在一起非常的合理,就和 JavaComposeFunture 一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// 尝试完成 A
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}

// 尝试完成 B
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}

if self.a.is_none() && self.b.is_none() {
// 都完成了就是 Ready
Poll::Ready(())
} else {
Poll::Pending
}
}
}

Wakeup

从上面的设计中可以发现,如果我们没有 wake 的话,我们只能一直的去轮训的获得是否已经 Ready,整体的效率会比较低,因此我们可以通过 wake 函数将其唤醒。Rust 中对于 Wake 的定义如下:

1
2
3
4
5
6
7
8
pub trait ArcWake: Send + Sync {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

// 对于我们来说,我们需要的就是实现这个 wake_by_ref
fn wake_by_ref(arc_self: &Arc<Self>);
}

让我们假设一个超时的场景,在运行一段时间之后,我们需要将其唤醒。定义一个 TimerFuture 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));

// 启动一个新的线程处理我们的状态量
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Thread Sleep 够了就算是超时了,这时候我们需要 wake 我们的工作对象了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});

TimerFuture { shared_state }
}
}

Executor

上面只是提供了机制,我们还需要执行的本体,Rust 抽象了 Executor 出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}

#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}

impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// 阻塞结束,完成了一部分的请求数据
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 创建一个 Waker,将这个任务本体放入
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// 尝试poll一次,如果是 Pending
if let Poll::Pending = future.as_mut().poll(context) {
*future_slot = Some(future);
}
}
}
}
}

是不是有点懵,先不慌,我们看看怎么用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn main() {
let (executor, spawner) = new_executor_and_spawner();

// 我们向 executor 扔一个任务,其实这里就扔到了 ready_queue 的队列中
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});

// 这里会直接运行
executor.run();
}

不过我们在此之前,我们需要定义好如下代码:

1
2
3
4
5
6
7
8
9
10
11
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
}

我们会发现实际上我们最终 wake 起来的还是我们自己,我们只是在控制 wake 的方式。

How it Work

1En6G.png

核心的逻辑是,我们接收到这个任务,将其放置于我们的 Task Queue

1
2
3
4
5
spawner.spawn(async { // 这个 socpe 就是 Task
println!("howdy!");
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});

这个任务此时还没有 wake 对象,我们为其创建 Context 对象(抽象规定的,我们需要将 Wake 置于其中),并且创建一个 wake 对象,而实际上 wake 对象仅仅是是这个 Task 本身。

因此当我们进入 TimerFuturePoll 函数的时候,我们就已经在等待 2 秒之后,Wake 触发如下逻辑

1
2
3
4
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}

此时,我们只不过又将我们的任务又 Re submit 到我们的任务 Task Queue 去了,此时我们再将整个 Thread 的工作给重新唤醒进行处理即可。


线程的变化为

1El06.png

参考源码: https://gist.github.com/yanickxia/270784bc004cb4c0a7b28b13ac9f2aba

Wake in real world

在真正的编程中,如果都是基于每一次启动一个线程会显得很不高效,因此一般都是基于信号来处理。
因此对于 socket 来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
//这里其实就是 eventloop
let local_executor = self.local_executor;

// 此socket 的id
let id = self.id;

// 将需要监听的事件放入 eventloop 里面,等待 eventloop 回调
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}

值得注意的,本篇博客意在指出 Funture 的抽象机制是如何运行的,对于真正的系统比如 tokio 在实现 Executor 会比我们现在所设计的要复杂的多。

Wake是如何真正工作的

比如我们又如下一个场景的代码

example
1
2
3
4
5
6
7
8
9
async fn example(min_len: usize) -> String { ➊ 
let content = async_read_file("foo.txt").await; ➋
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}

}

我们可能在 async_read_file("foo.txt")async_read_file("bar.txt") 这两个函数处都进入 Pending 状态,我们势必要保存 example 这个函数的运行状态,再一次唤醒的时候,我们要从处理过的地方再处理下去。

保存状态

对于 Rust 来说,保存状态是在 编译期 完成的。对于上面的代码来说,编译器会生成如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 编译器生成代码,举个例子

struct StartState { //➊ 初始化状态,有一个min_len变量状态
min_len: usize,
}

struct WaitingOnFooTxtState { //➋ 等待执行 Foo.txt 读取时的上下文状态
min_len: usize,
foo_txt_future: impl Future<Output = String>,
}

struct WaitingOnBarTxtState { //➌ 等待执行 Bar.txt 读取时的上下文状态
content: String,
bar_txt_future: impl Future<Output = String>,
}

struct EndState {} //➍ 结束状态

很容易理解,在函数入口的地方 ➊,我们只有一个变量也就是 min_len,在执行 async_read_file("foo.txt") 的时候 ➋ ,我们从下文中可以捕捉到的变量也只有 min_len 还有就是我们调用的 Futrue 对象了,➌ 处的话,从返回值判断可知 min_len 已经无用了,因此在此时需要关注的变量是 context

因此我们把这些状态想象成一个有限状态的自动机,可以定下如下状态

ExampleStateMachine
1
2
3
4
5
6
enum ExampleStateMachine {
Start(StartState),
WaitingOnFooTxt(WaitingOnFooTxtState),
WaitingOnBarTxt(WaitingOnBarTxtState),
End(EndState),
}

基于如上状态,我们将 State Machine 变成 Future

ExampleStateMachine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl Future for ExampleStateMachine {
type Output = String; // 返回值类型

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self {
ExampleStateMachine::Start(state) => {…}
ExampleStateMachine::WaitingOnFooTxt(state) => {…}
ExampleStateMachine::WaitingOnBarTxt(state) => {…}
ExampleStateMachine::End(state) => {…}
}
}
}
}

状态转换

那我们来填充下 Start 函数

Start
1
2
3
4
5
6
7
8
9
10
ExampleStateMachine::Start(state) => {
let foo_txt_future = async_read_file("foo.txt");
// `.await` 操作
let state = WaitingOnFooTxtState {
min_len: state.min_len,
foo_txt_future,
};
// 修改状态到 WaitingOnFooTxt
*self = ExampleStateMachine::WaitingOnFooTxt(state);
}

对于 Start 不存在阻塞的问题,直接进入了下一个状态。我们来看看复杂点的 WaitingOnFooTxt

ExampleStateMachine::WaitingOnFooTxt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExampleStateMachine::WaitingOnFooTxt(state) => {
match state.foo_txt_future.poll(cx) { // 执行 async_read_file("foo.txt") 逻辑
Poll::Pending => return Poll::Pending, // Pending 直接返回
Poll::Ready(content) => {
if content.len() < state.min_len { // if 分支命中
let bar_txt_future = async_read_file("bar.txt");
// `.await` 操作
let state = WaitingOnBarTxtState {
content,
bar_txt_future,
};
*self = ExampleStateMachine::WaitingOnBarTxt(state); // 进入 WaitingOnBarTxt 状态
} else { // if 未命中直接进入 END 状态
*self = ExampleStateMachine::End(EndState));
return Poll::Ready(content);
}
}
}
}

因此我们很容易整理出来这张图
HU9f4.png

因此对于 Rust 来说,编译器会帮我在每一个 .await 处进行上下文的捕获,生成一个状态 Struct,因此我们申明的 async fn 在编译完成之后就变成一个 state machine,因此此阶段并不是通过 OS 保存 Thread 信息来完成了,而所有的信息都在我们的 Future 已经全部包含了,直接拿出来继续执行即可了,不需要传统的上保存下状态。

参考