Future
Rust
在实现异步编程的时候,采用 Poll
模型。最为核心的就是
1 2 3 4
| enum Poll<T> { Ready(T), Pending, }
|
我们定义了一个类型为 Poll
处于两种状态之一 Ready
Pending
- Ready: 我们已经完成了任务可以返回结果 T
- Pending:此时我们等待一些其他依赖,这时候我们需要将自己占用的资源释放出来,基于一个
wake()
函数再次将自己唤醒
最常见的就是 Socket
编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| pub struct SocketRead<'a> { socket: &'a Socket, }
impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { Poll::Ready(self.socket.read_buf()) } else { self.socket.set_readable_callback(wake); Poll::Pending } } }
|
对于多个 Pollable
的对象,组合在一起非常的合理,就和 Java
的 ComposeFunture
一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } }
if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } }
if self.a.is_none() && self.b.is_none() { Poll::Ready(()) } else { Poll::Pending } } }
|
Wakeup
从上面的设计中可以发现,如果我们没有 wake
的话,我们只能一直的去轮训的获得是否已经 Ready
,整体的效率会比较低,因此我们可以通过 wake
函数将其唤醒。Rust
中对于 Wake
的定义如下:
1 2 3 4 5 6 7 8
| pub trait ArcWake: Send + Sync { fn wake(self: Arc<Self>) { Self::wake_by_ref(&self) }
fn wake_by_ref(arc_self: &Arc<Self>); }
|
让我们假设一个超时的场景,在运行一段时间之后,我们需要将其唤醒。定义一个 TimerFuture
对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| impl TimerFuture { pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, }));
let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } });
TimerFuture { shared_state } } }
|
Executor
上面只是提供了机制,我们还需要执行的本体,Rust
抽象了 Executor
出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| struct Executor { ready_queue: Receiver<Arc<Task>>, }
#[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, }
impl Spawner { fn spawn(&self, future: impl Future<Output=()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); } }
impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); if let Poll::Pending = future.as_mut().poll(context) { *future_slot = Some(future); } } } } }
|
是不是有点懵,先不慌,我们看看怎么用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fn main() { let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async { println!("howdy!"); TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); });
executor.run(); }
|
不过我们在此之前,我们需要定义好如下代码:
1 2 3 4 5 6 7 8 9 10 11
| struct Task { future: Mutex<Option<BoxFuture<'static, ()>>>, task_sender: SyncSender<Arc<Task>>, }
impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); } }
|
我们会发现实际上我们最终 wake
起来的还是我们自己,我们只是在控制 wake
的方式。
How it Work
核心的逻辑是,我们接收到这个任务,将其放置于我们的 Task Queue
中
1 2 3 4 5
| spawner.spawn(async { println!("howdy!"); TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); });
|
这个任务此时还没有 wake
对象,我们为其创建 Context
对象(抽象规定的,我们需要将 Wake 置于其中),并且创建一个 wake
对象,而实际上 wake
对象仅仅是是这个 Task
本身。
因此当我们进入 TimerFuture
的 Poll
函数的时候,我们就已经在等待 2
秒之后,Wake
触发如下逻辑
1 2 3 4
| fn wake_by_ref(arc_self: &Arc<Self>) { let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); }
|
此时,我们只不过又将我们的任务又 Re submit
到我们的任务 Task Queue
去了,此时我们再将整个 Thread
的工作给重新唤醒进行处理即可。
线程的变化为
参考源码: https://gist.github.com/yanickxia/270784bc004cb4c0a7b28b13ac9f2aba
Wake in real world
在真正的编程中,如果都是基于每一次启动一个线程会显得很不高效,因此一般都是基于信号来处理。
因此对于 socket
来说
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| impl Socket { fn set_readable_callback(&self, waker: Waker) { let local_executor = self.local_executor;
let id = self.id;
local_executor.event_map.insert(id, waker); local_executor.add_io_event_interest( &self.socket_file_descriptor, Event { id, signals: READABLE }, ); } }
|
值得注意的,本篇博客意在指出 Funture
的抽象机制是如何运行的,对于真正的系统比如 tokio
在实现 Executor
会比我们现在所设计的要复杂的多。
Wake是如何真正工作的
比如我们又如下一个场景的代码
example1 2 3 4 5 6 7 8 9
| async fn example(min_len: usize) -> String { ➊ let content = async_read_file("foo.txt").await; ➋ if content.len() < min_len { content + &async_read_file("bar.txt").await ➌ } else { content } ➍ }
|
我们可能在 async_read_file("foo.txt")
和 async_read_file("bar.txt")
这两个函数处都进入 Pending
状态,我们势必要保存 example
这个函数的运行状态,再一次唤醒的时候,我们要从处理过的地方再处理下去。
保存状态
对于 Rust
来说,保存状态是在 编译期
完成的。对于上面的代码来说,编译器会生成如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
struct StartState { min_len: usize, }
struct WaitingOnFooTxtState { min_len: usize, foo_txt_future: impl Future<Output = String>, }
struct WaitingOnBarTxtState { content: String, bar_txt_future: impl Future<Output = String>, }
struct EndState {}
|
很容易理解,在函数入口的地方 ➊,我们只有一个变量也就是 min_len
,在执行 async_read_file("foo.txt")
的时候 ➋ ,我们从下文中可以捕捉到的变量也只有 min_len
还有就是我们调用的 Futrue
对象了,➌ 处的话,从返回值判断可知 min_len
已经无用了,因此在此时需要关注的变量是 context
。
因此我们把这些状态想象成一个有限状态的自动机,可以定下如下状态
ExampleStateMachine1 2 3 4 5 6
| enum ExampleStateMachine { Start(StartState), WaitingOnFooTxt(WaitingOnFooTxtState), WaitingOnBarTxt(WaitingOnBarTxtState), End(EndState), }
|
基于如上状态,我们将 State Machine
变成 Future
ExampleStateMachine1 2 3 4 5 6 7 8 9 10 11 12 13 14
| impl Future for ExampleStateMachine { type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { loop { match self { ExampleStateMachine::Start(state) => {…} ExampleStateMachine::WaitingOnFooTxt(state) => {…} ExampleStateMachine::WaitingOnBarTxt(state) => {…} ExampleStateMachine::End(state) => {…} } } } }
|
状态转换
那我们来填充下 Start
函数
Start1 2 3 4 5 6 7 8 9 10
| ExampleStateMachine::Start(state) => { let foo_txt_future = async_read_file("foo.txt"); let state = WaitingOnFooTxtState { min_len: state.min_len, foo_txt_future, }; *self = ExampleStateMachine::WaitingOnFooTxt(state); }
|
对于 Start
不存在阻塞的问题,直接进入了下一个状态。我们来看看复杂点的 WaitingOnFooTxt
ExampleStateMachine::WaitingOnFooTxt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ExampleStateMachine::WaitingOnFooTxt(state) => { match state.foo_txt_future.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(content) => { if content.len() < state.min_len { let bar_txt_future = async_read_file("bar.txt"); let state = WaitingOnBarTxtState { content, bar_txt_future, }; *self = ExampleStateMachine::WaitingOnBarTxt(state); } else { *self = ExampleStateMachine::End(EndState)); return Poll::Ready(content); } } } }
|
因此我们很容易整理出来这张图
因此对于 Rust
来说,编译器会帮我在每一个 .await
处进行上下文的捕获,生成一个状态 Struct
,因此我们申明的 async fn
在编译完成之后就变成一个 state machine
,因此此阶段并不是通过 OS
保存 Thread
信息来完成了,而所有的信息都在我们的 Future
已经全部包含了,直接拿出来继续执行即可了,不需要传统的上保存下状态。
参考