在上一篇 解密 Rust 异步编程 中我们已经知道了 Funture
工作的原理,让我们看看真实世界里面是如何工作的。
本文基于 Tokio 0.3.0
版本
Example Tokio
启动一个最基础的 TCP Echo
服务如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #[tokio::main] async fn main () -> Result <(), Box <dyn Error>> { let addr = env::args () .nth (1 ) .unwrap_or_else (|| "127.0.0.1:8080" .to_string ()); let mut listener = TcpListener::bind (&addr).await ?; println! ("Listening on: {}" , addr); loop { let (mut socket, _) = listener.accept ().await ?; tokio::spawn (async move { let mut buf = [0 ; 1024 ]; loop { let n = socket .read (&mut buf) .await .expect ("failed to read data from socket" ); if n == 0 { return ; } socket .write_all (&buf[0 ..n]) .await .expect ("failed to write data to socket" ); } }); } }
Read Funture 我们很容易定位到,我们在处理这个可停止的系统要处理的就是 read
和 write
这两个操作,因此这2个操作都是使用 .await
返回了一个 Future
对象,我们看看 read
部分返回的。
Read 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 impl <R> Future for Read <'_ , R>where R: AsyncRead + Unpin + ?Sized , { type Output = io::Result <usize >; fn poll (mut self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<io::Result <usize >> { let me = &mut *self ; let mut buf = ReadBuf::new (me.buf); match Pin::new (&mut *me.reader).poll_read (cx, &mut buf) { std::task::Poll::Ready (t) => t, std::task::Poll::Pending => return std::task::Poll::Pending, } Poll::Ready (Ok (buf.filled ().len ())) } }
对于 Read
的逻辑也很简单,我们尝试用 self.reader
中获得数据,如果就绪就返回 Ready
不然就是 Pending
。而逻辑主要是在 reader
中,我们定位进去。
TcpStream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 pub (crate ) fn poll_read_priv ( &self , cx: &mut Context<'_ >, buf: &mut ReadBuf<'_ >, ) -> Poll<io::Result <()>> { ready!(self .io.poll_read_ready (cx, mio::Ready::readable ()))?; let b = unsafe { &mut *(buf.unfilled_mut () as *mut [std::mem::MaybeUninit<u8 >] as *mut [u8 ]) }; match self .io.get_ref ().read (b) { Ok (n) => { unsafe { buf.assume_init (n); } buf.add_filled (n); Poll::Ready (Ok (())) } Err (e) => Poll::Ready (Err (e)), } }
这一段就是我们底层的逻辑,对于 self.io.poll_read_ready(cx, mio::Ready::readable())
来说,是我们所依赖的 Mio
对于 Epoll
封装,这里就不做展开。
我们既然知道了在读取的过程中,我们有一个客户被暂停的对象的话,那我们此时我们就要看下我们的执行器以及wake
Executor (Scheduler Worker) 我们这里也不用和代码死磕了,从官方的设计文档中 runtime 中我们可以得知,对于我们在上文中分析的 Executor
也就是对应到 Runtime
这个抽象,而这个抽象有两个具体的实现: Basic Scheduler
和 Threaded Scheduler
,而这两者最大的区别是 Basic Scheduler
是一个 single-threaded
的设计,因此社区也推荐对于大部分的时候,我们都应该使用 Threaded Scheduler
,我们先分析下 Threaded Scheduler
,然后再看看 Basic Scheduler
Threaded Scheduler 对于 Threaded Scheduler
来说,对于每一个线程都是对等的,每个线程都会生成一个 Worker
对象。
tokio/src/runtime/thread_pool/worker.rs 1 2 3 4 5 pub (super ) struct Worker { shared: Arc<Shared>, index: usize , core: AtomicCell<Core>, }
Worker 对于 worker
运行的逻辑同文件中,如下所示:
tokio/src/runtime/thread_pool/worker.rs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 fn run (&self , mut core: Box <Core>) -> RunResult { while !core.is_shutdown { if let Some (task) = core.next_task (&self .worker) { core = self .run_task (task, core)?; continue ; } if let Some (task) = core.steal_work (&self .worker) { core = self .run_task (task, core)?; } else { core = self .park (core); } } self .worker.shared.shutdown (core, self .worker.clone ()); Err (()) }
不过在这个 park
中另有玄机,在代码处,我们可以发现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 fn park (&mut self ) -> io::Result <()> { self .turn (None )?; Ok (()) } fn turn (&mut self , max_wait: Option <Duration>) -> io::Result <()> { let mut events = self .events.take ().expect ("i/o driver event store missing" ); match self .inner.io.poll (&mut events, max_wait) { Ok (_) => {} Err (e) => return Err (e), } for event in events.iter () { let token = event.token (); if token == TOKEN_WAKEUP { self .inner .wakeup .set_readiness (mio::Ready::empty ()) .unwrap (); } else { self .dispatch (token, event.readiness ()); } } self .events = Some (events); Ok (()) }
不过值得注意的这里的 Poller
是作为 Worker
的私有属性的存在的,这样的话我们可以明白了 非阻塞
原理是怎么工作的。
获取本地的工作队列,如果有就进入 3,如果没有就跳转到 2
获取其他Worker的工作队列中的任务,如果有就进入 3,没有就进入 4
执行 Poll 逻辑
进入 Park 逻辑,等待就绪实践的完成
这里仍有一个小问题,工作队列的填充工作势必是由 Wake
进行填充的,这段逻辑我们再接下来分析一下
Wake 而对于 Tokio
而言,Wake
对象的创建在我们创建 Task
进行 Spwan
的时候已经创建完成
tokio/src/runtime/task/core.rs 1 2 3 4 5 6 7 8 9 10 11 pub (super ) fn poll (&self , header: &Header) -> Poll<T::Output> { let res = { self .stage.with_mut (|ptr| { let waker_ref = waker_ref::<T, S>(header); let mut cx = Context::from_waker (&*waker_ref); future.poll (&mut cx) }) }; }
wake
的逻辑如下
wake_by_ref 1 2 3 4 5 pub (super ) fn wake_by_ref (&self ) { if self .header ().state.transition_to_notified () { self .core ().schedule (Notified (self .to_task ())); } }
从代码中也很容易看出来,我们将 Header
对象转化成了 Task
,然后重新回到了调度器中进行调度,而这里又引入了一个新的理念了 Scheduler
,这个我们晚一点再来解释,先看看 Header
,Header
其实就是这个 Task
的状态数据。
Task Init 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 impl <T: Future, S: Schedule> Cell<T, S> { pub (super ) fn new (future: T, state: State) -> Box <Cell<T, S>> { Box ::new (Cell { header: Header { state, owned: UnsafeCell::new (linked_list::Pointers::new ()), queue_next: UnsafeCell::new (None ), stack_next: UnsafeCell::new (None ), vtable: raw::vtable::<T, S>(), }, core: Core { scheduler: UnsafeCell::new (None ), stage: UnsafeCell::new (Stage::Running (future)), }, trailer: Trailer { waker: UnsafeCell::new (None ), }, }) } }
Scheduler 在真正实现 NoBlocking
的时候,我们需要将 Task
放置到相关的 Task queue
中,而这部分的逻辑就交付给 Scheduler
这个抽象对象进行处理。
tokio/src/runtime/thread_pool/worker.rs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 fn schedule (&self , task: Notified) { self .shared.schedule (task, false ); } pub (super ) fn schedule (&self , task: Notified, is_yield: bool ) { CURRENT.with (|maybe_cx| { if let Some (cx) = maybe_cx { if self .ptr_eq (&cx.worker.shared) { if let Some (core) = cx.core.borrow_mut ().as_mut () { self .schedule_local (core, task, is_yield); return ; } } } self .inject.push (task); self .notify_parked (); }); }
Harness 特地的分出了一个章节和大家再聊聊一个和 Wake
对等的抽象 Harness
,对于我们系统来说,Wake是进行线程唤醒的,涉及到 Task
的状态修改,而这个对象就是 Harness
(而实际上 Harness 也是我们具象化的 Wake 对象)。
Harness Poll 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 pub (super ) fn poll (self ) { let is_not_bound = !self .core ().is_bound (); let snapshot = match self .header ().state.transition_to_running (is_not_bound) { Ok (snapshot) => snapshot, Err (_) => { self .drop_reference (); return ; } }; if is_not_bound { self .core ().bind_scheduler (self .to_task ()); } let res = panic::catch_unwind (panic::AssertUnwindSafe (|| { if snapshot.is_cancelled () { Poll::Ready (Err (JoinError::cancelled2 ())) } else { let res = guard.core.poll (self .header ()); res.map (Ok ) } })); match res { Ok (Poll::Ready (out)) => { self .complete (out, snapshot.is_join_interested ()); } Ok (Poll::Pending) => { match self .header ().state.transition_to_idle () { Ok (snapshot) => { } Err (_) => self .cancel_task (), } } Err (err) => { self .complete (Err (JoinError::panic2 (err)), snapshot.is_join_interested ()); } } }
Worker Launch 我们的 Worker
是如何启动的?还得我们最开始的 Example
中的 tokio::spawn()
吗?
tokio/src/runtime/builder.rs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 fn build_threaded_runtime (&mut self ) -> io::Result <Runtime> { let core_threads = self .core_threads.unwrap_or_else (|| cmp::min (self .max_threads, num_cpus ())); let (io_driver, io_handle) = io::create_driver (self .enable_io)?; let (driver, time_handle) = time::create_driver (self .enable_time, io_driver, clock.clone ()); let (scheduler, launch) = ThreadPool::new (core_threads, Parker::new (driver)); let spawner = Spawner::ThreadPool (scheduler.spawner ().clone ()); let blocking_pool = blocking::create_blocking_pool (self , self .max_threads); let blocking_spawner = blocking_pool.spawner ().clone (); let handle = Handle { spawner, io_handle, time_handle, clock, blocking_spawner, }; handle.enter (|| launch.launch ()); Ok (Runtime { kind: Kind::ThreadPool (scheduler), handle, blocking_pool, }) }
因此在 tokio/src/runtime/blocking/pool.rs:spawn
中,我们创建出来所有的 Worker
,而 Worker
的逻辑是我上面所使用的的 Example
部分。因此最后我们会生成多个 Worker
*COOP 从官方的 Docs
中得知,为了防止某个 Funture
一直返回 Ready
状态(这样就变成了一个 Loop 了),因此增加了一个 COOP
机制,也比较容易理解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 pub (crate ) fn poll_proceed (cx: &mut Context<'_ >) -> Poll<RestoreOnPending> { CURRENT.with (|cell| { let mut budget = cell.get (); if budget.decrement () { let restore = RestoreOnPending (Cell::new (cell.get ())); cell.set (budget); Poll::Ready (restore) } else { cx.waker ().wake_by_ref (); Poll::Pending } }) } fn decrement (&mut self ) -> bool { if let Some (num) = &mut self .0 { if *num > 0 { *num -= 1 ; true } else { false } } else { true } } const fn initial () -> Budget { Budget (Some (128 )) }
也就是如果一个 Funture
一直返回 Ready
,当返回 128
次 Ready
之后就强制返回 Pending
。
流程小结 当我们启动就绪什么都没有发生的时候,此时我们系统所有的 Worker
都进入了 Waiting
的状态等待任务被创建再执行。 我们执行 nc localhost 8080
尝试建立连接之后系统就开始真正的工作起来。
创建任务
执行完成之后,主线程因为等待 accept
事件就 parking
过去了。
运行任务
[选读] Switch Thread 在正在的执行过程中,我们会观察到一个有趣的现状,比如我们修改下 Example
1 2 3 4 5 6 7 8 9 10 11 tokio::spawn (async move { let mut buf = [0 ; 1024 ]; loop { let n = socket .read (&mut buf) .await .expect ("failed to read data from socket" ); println! ("current thread: {:?}" , thread::current ()); });
运行的时候我们会观察到一个有趣的情况:
read thread 1 2 3 4 5 6 7 8 9 10 Listening on: 127.0.0.1:8080 current thread: Thread { id : ThreadId(4), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(12), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(4), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(12), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(4), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(12), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(4), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(12), name: Some("tokio-runtime-worker" ) } current thread: Thread { id : ThreadId(4), name: Some("tokio-runtime-worker" ) }
并不是很符合我们的直觉,这里的设计是在
tokio/src/runtime/thread_pool/worker.rs 1 2 3 4 5 6 7 8 9 10 11 fn run_task (&self , task: Notified, mut core: Box <Core>) -> RunResult { core.transition_from_searching (&self .worker); } fn transition_worker_from_searching (&self ) { if self .idle.transition_worker_from_searching () { self .notify_parked (); } }
看到这里我们做一个全景性质的小结
Basic Scheduler 有了上面的 Threaded Scheduler
,对于 Basic Scheduler
的理解就简单很多了。
Spawner 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #[derive(Clone)] pub (crate ) struct Spawner { shared: Arc<Shared>, } impl Spawner { pub (crate ) fn spawn <F>(&self , future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static , F::Output: Send + 'static , { let (task, handle) = task::joinable (future); self .shared.schedule (task); handle } }
对于 Spawner
来说蛮简单的就是直接将任务 PUSH
进 Task Queue
即可。
EventLoop 对于 Basic Scheduler
逻辑理解起来更为的简单一些。
tokio/src/runtime/basic_scheduler.rs:run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 let _enter = runtime::enter (false );let waker = waker_ref (&scheduler.spawner.shared);let mut cx = std::task::Context::from_waker (&waker);'outer : loop { if let Ready (v) = crate::coop::budget (|| future.as_mut ().poll (&mut cx)) { return v; } for _ in 0 ..MAX_TASKS_PER_TICK { let tick = scheduler.tick; scheduler.tick = scheduler.tick.wrapping_add (1 ); let next = if tick % REMOTE_FIRST_INTERVAL == 0 { scheduler .spawner .pop () .or_else (|| context.tasks.borrow_mut ().queue.pop_front ()) } else { context .tasks .borrow_mut () .queue .pop_front () .or_else (|| scheduler.spawner.pop ()) }; match next { Some (task) => crate::coop::budget (|| task.run ()), None => { scheduler.park.park ().ok ().expect ("failed to park" ); continue 'outer ; } } } scheduler .park .park_timeout (Duration::from_millis (0 )) .ok () .expect ("failed to park" ); }
对于 Basic Scheduler
来说唯一有些复杂的就是,在 Spwan
和 BasicScheduler
中都会有任务,因为会涉及到在一定轮数下就会切换到 Spwan
中,不过从实际的运行过程中,会发现我们几乎都是在 cx.tasks.borrow_mut().queue.push_back(task)
进行工作,因为每一次运行都涉及到 bind
,不过值得注意的是 Spawner
是会在多个线程之间共享的。
Wake 对于 wake
就很简单了,将 unpark
对象进行 unpark
即可。
1 2 3 4 5 6 7 8 9 10 impl Wake for Shared { fn wake (self : Arc<Self >) { Wake::wake_by_ref (&self ) } fn wake_by_ref (arc_self: &Arc<Self >) { arc_self.unpark.unpark (); } }
流程小结 正常使用 Spawner
会从 Context
也就是当前线程中创建出来一个 Spawner
进行任务提交的时候,并不会将 Task
放入 Spanwer
内置的队列中。
线程模型 在 Epoll 之下的线程模型 中,我们也提及过,对于利用多核的能力,我们可以对 accept()
或者 read()
进行负载均衡,对于 Tokio
来说,我们实际上并没有像 Netty
那样限制了 BossGroup
和 WorkGroup
,Tokio
作为一个底层框架并没有限制这样的工作模式,比如我们就可以按照自己的想法进行定制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #[tokio::main] async fn main () -> Result <(), Box <dyn Error>> { let mut boss = runtime::Builder::new () .threaded_scheduler () .enable_all () .core_threads (2 ) .build ()?; let mut workers = runtime::Builder::new () .threaded_scheduler () .enable_all () .core_threads (4 ) .build ()?; let addr = env::args () .nth (1 ) .unwrap_or_else (|| "127.0.0.1:8080" .to_string ()); let mut listener = TcpListener::bind (&addr).await ?; println! ("Listening on: {}" , addr); boss.spawn (async move { loop { let (mut socket, _) = listener.accept () .await .expect ("failed to accept from socket" ); println! ("accept thread: {:?}" , thread::current ()); workers.spawn (async move { let mut buf = [0 ; 1024 ]; loop { let n = socket.read (&mut buf).await .expect ("failed to read data from socket" ); break ; } socket.shutdown (Shutdown::Write); }); } }); Ok (thread::park ()) }
运行结果如下:
1 2 3 4 accept thread: Thread { id : ThreadId(14), name: Some("tokio-runtime-worker" ) } accept thread: Thread { id : ThreadId(14), name: Some("tokio-runtime-worker" ) } accept thread: Thread { id : ThreadId(15), name: Some("tokio-runtime-worker" ) } accept thread: Thread { id : ThreadId(15), name: Some("tokio-runtime-worker" ) }
按照上面的代码我们就可以模拟出 Netty
的线程模型。
参考