真实世界中的 Rust 异步编程

1zH7t.png

在上一篇 解密 Rust 异步编程 中我们已经知道了 Funture 工作的原理,让我们看看真实世界里面是如何工作的。

本文基于 Tokio 0.3.0 版本

Example

Tokio 启动一个最基础的 TCP Echo 服务如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];

// In a loop, read data from the socket and write the data back.
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");

if n == 0 {
return;
}

socket
.write_all(&buf[0..n])
.await
.expect("failed to write data to socket");
}
});
}
}

Read Funture

我们很容易定位到,我们在处理这个可停止的系统要处理的就是 readwrite 这两个操作,因此这2个操作都是使用 .await 返回了一个 Future 对象,我们看看 read 部分返回的。

Read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
impl<R> Future for Read<'_, R>
where
R: AsyncRead + Unpin + ?Sized,
{
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
let me = &mut *self;
let mut buf = ReadBuf::new(me.buf);
// ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut buf))?; 方便理解我把宏展开了
match Pin::new(&mut *me.reader).poll_read(cx, &mut buf) {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
Poll::Ready(Ok(buf.filled().len()))
}
}

对于 Read 的逻辑也很简单,我们尝试用 self.reader 中获得数据,如果就绪就返回 Ready 不然就是 Pending。而逻辑主要是在 reader中,我们定位进去。

TcpStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; // ➊ 如果不可读直接返回 Pending,同上
let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
match self.io.get_ref().read(b) {
Ok(n) => {
unsafe {
buf.assume_init(n);
}
buf.add_filled(n);
Poll::Ready(Ok(())) //➋ 获得到数据即是 Ready
}
Err(e) => Poll::Ready(Err(e)), // ➌ 如何是异常就是返回就绪的异常
}
}

这一段就是我们底层的逻辑,对于 self.io.poll_read_ready(cx, mio::Ready::readable()) 来说,是我们所依赖的 Mio 对于 Epoll 封装,这里就不做展开。

我们既然知道了在读取的过程中,我们有一个客户被暂停的对象的话,那我们此时我们就要看下我们的执行器以及wake

Executor (Scheduler Worker)

我们这里也不用和代码死磕了,从官方的设计文档中 runtime 中我们可以得知,对于我们在上文中分析的 Executor 也就是对应到 Runtime 这个抽象,而这个抽象有两个具体的实现: Basic SchedulerThreaded Scheduler,而这两者最大的区别是 Basic Scheduler 是一个 single-threaded 的设计,因此社区也推荐对于大部分的时候,我们都应该使用 Threaded Scheduler,我们先分析下 Threaded Scheduler,然后再看看 Basic Scheduler

Threaded Scheduler

对于 Threaded Scheduler 来说,对于每一个线程都是对等的,每个线程都会生成一个 Worker 对象。

tokio/src/runtime/thread_pool/worker.rs
1
2
3
4
5
pub(super) struct Worker {
shared: Arc<Shared>,
index: usize,
core: AtomicCell<Core>,
}

Worker

对于 worker 运行的逻辑同文件中,如下所示:

tokio/src/runtime/thread_pool/worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown { //当系统未停机的时候

// 检查当前线程是否有可以可以运行的任务
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}

// 本地没有可以运行的任务的话,就去其他的Worker偷一个回来
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// 其他的 Worker 也没有,就 Park
core = self.park(core);
}
}

// Signal shutdown
self.worker.shared.shutdown(core, self.worker.clone());
Err(())
}

不过在这个 park 中另有玄机,在代码处,我们可以发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// tokio/src/io/driver/mod.rs
fn park(&mut self) -> io::Result<()> {
self.turn(None)?;
Ok(())
}

// tokio/src/io/driver/mod.rs
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
//获得我们的 IO Event
let mut events = self.events.take().expect("i/o driver event store missing");

// 在这里等待 IO 的事件的发生, 这里的 max_wait 对于 Tokio 传入的是 0,无限制等待
match self.inner.io.poll(&mut events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
}

// 处理我们获取到的事件
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
self.inner
.wakeup
.set_readiness(mio::Ready::empty())
.unwrap(); //唤醒 Task
} else {
self.dispatch(token, event.readiness()); //这里其实也是在用 wake 唤醒,不过这里需要使用 Token 进行转换一下
}
}

self.events = Some(events);

Ok(())
}

不过值得注意的这里的 Poller 是作为 Worker 的私有属性的存在的,这样的话我们可以明白了 非阻塞 原理是怎么工作的。

1jmGR.png

  1. 获取本地的工作队列,如果有就进入 3,如果没有就跳转到 2
  2. 获取其他Worker的工作队列中的任务,如果有就进入 3,没有就进入 4
  3. 执行 Poll 逻辑
  4. 进入 Park 逻辑,等待就绪实践的完成

这里仍有一个小问题,工作队列的填充工作势必是由 Wake 进行填充的,这段逻辑我们再接下来分析一下

Wake

而对于 Tokio 而言,Wake 对象的创建在我们创建 Task 进行 Spwan 的时候已经创建完成

tokio/src/runtime/task/core.rs
1
2
3
4
5
6
7
8
9
10
11
pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> {
let res = {
self.stage.with_mut(|ptr| {
// 在我们调用的外侧封装一层 Poll,我们的额 Waker 就是 Header 这个对象
let waker_ref = waker_ref::<T, S>(header);
let mut cx = Context::from_waker(&*waker_ref);

future.poll(&mut cx)
})
};
}

wake 的逻辑如下

wake_by_ref
1
2
3
4
5
pub(super) fn wake_by_ref(&self) {
if self.header().state.transition_to_notified() {
self.core().schedule(Notified(self.to_task()));
}
}

从代码中也很容易看出来,我们将 Header 对象转化成了 Task,然后重新回到了调度器中进行调度,而这里又引入了一个新的理念了 Scheduler,这个我们晚一点再来解释,先看看 HeaderHeader 其实就是这个 Task 的状态数据。

Task Init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
impl<T: Future, S: Schedule> Cell<T, S> {
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
Box::new(Cell {
header: Header {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
},
core: Core {
scheduler: UnsafeCell::new(None),
stage: UnsafeCell::new(Stage::Running(future)),
},
trailer: Trailer {
waker: UnsafeCell::new(None),
},
})
}
}

Scheduler

在真正实现 NoBlocking 的时候,我们需要将 Task 放置到相关的 Task queue中,而这部分的逻辑就交付给 Scheduler 这个抽象对象进行处理。

tokio/src/runtime/thread_pool/worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fn schedule(&self, task: Notified) {
self.shared.schedule(task, false);
}

pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
if self.ptr_eq(&cx.worker.shared) {
// 属于当前线程的任务就在本地进行调度
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
}
}
}

// 其他情况就置于队列,然后唤醒其他的 Worker
self.inject.push(task);
self.notify_parked();
});
}

Harness

特地的分出了一个章节和大家再聊聊一个和 Wake 对等的抽象 Harness,对于我们系统来说,Wake是进行线程唤醒的,涉及到 Task 的状态修改,而这个对象就是 Harness (而实际上 Harness 也是我们具象化的 Wake 对象)。

Harness Poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
pub(super) fn poll(self) {
//如果是第一次运行就需要在下面进行 Core 绑定
let is_not_bound = !self.core().is_bound();

// Transition the task to the running state.
//
// 恢复 Task 到运行状态
let snapshot = match self.header().state.transition_to_running(is_not_bound) {
Ok(snapshot) => snapshot,
Err(_) => {
self.drop_reference();
return;
}
};

if is_not_bound {
self.core().bind_scheduler(self.to_task());
}

// 这里就是执行我们真正的 Task 的逻辑
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {

if snapshot.is_cancelled() { // 已经取消就返回
Poll::Ready(Err(JoinError::cancelled2()))
} else {
let res = guard.core.poll(self.header()); // 这里又回到我们上面的 创建 Wake 的地方,又是一轮循环
res.map(Ok)
}
}));

match res {
Ok(Poll::Ready(out)) => {
self.complete(out, snapshot.is_join_interested());
}
Ok(Poll::Pending) => { // 如果真实的逻辑是 返回 Pending
match self.header().state.transition_to_idle() { // 将Task的状态保存
Ok(snapshot) => {
}
Err(_) => self.cancel_task(),
}
}
Err(err) => {
self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested());
}
}
}

Worker Launch

我们的 Worker 是如何启动的?还得我们最开始的 Example 中的 tokio::spawn() 吗?

tokio/src/runtime/builder.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
//根据 CPU CORE 启动 WORKER 的线程数
let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));

let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());

//创建一组Worker 和 一个启动器
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());


// 创建 Blocking Pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Create the runtime handle
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
};

// 这里开始启动 Worker
handle.enter(|| launch.launch());

Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
handle,
blocking_pool,
})
}

因此在 tokio/src/runtime/blocking/pool.rs:spawn 中,我们创建出来所有的 Worker,而 Worker 的逻辑是我上面所使用的的 Example 部分。因此最后我们会生成多个 Worker

1KeIQ.png

*COOP

从官方的 Docs 中得知,为了防止某个 Funture 一直返回 Ready 状态(这样就变成了一个 Loop 了),因此增加了一个 COOP机制,也比较容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
CURRENT.with(|cell| {
let mut budget = cell.get();

if budget.decrement() { // 当 Rem 可减的时候正常返回
let restore = RestoreOnPending(Cell::new(cell.get()));
cell.set(budget);
Poll::Ready(restore)
} else {
cx.waker().wake_by_ref(); // 当 Rem 不可减的时候就强制返回 Pending
Poll::Pending
}
})
}

fn decrement(&mut self) -> bool {
if let Some(num) = &mut self.0 {
if *num > 0 {
*num -= 1;
true
} else {
false
}
} else {
true
}
}

const fn initial() -> Budget {
Budget(Some(128))
}

也就是如果一个 Funture 一直返回 Ready,当返回 128Ready 之后就强制返回 Pending

流程小结

当我们启动就绪什么都没有发生的时候,此时我们系统所有的 Worker 都进入了 Waiting 的状态等待任务被创建再执行。
我们执行 nc localhost 8080 尝试建立连接之后系统就开始真正的工作起来。

创建任务

执行完成之后,主线程因为等待 accept 事件就 parking 过去了。

运行任务
[选读] Switch Thread

在正在的执行过程中,我们会观察到一个有趣的现状,比如我们修改下 Example

1
2
3
4
5
6
7
8
9
10
11
tokio::spawn(async move {
let mut buf = [0; 1024];

loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");

println!("current thread: {:?}", thread::current());
});

运行的时候我们会观察到一个有趣的情况:

read thread
1
2
3
4
5
6
7
8
9
10
Listening on: 127.0.0.1:8080
current thread: Thread { id: ThreadId(4), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(12), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(4), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(12), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(4), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(12), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(4), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(12), name: Some("tokio-runtime-worker") }
current thread: Thread { id: ThreadId(4), name: Some("tokio-runtime-worker") }

并不是很符合我们的直觉,这里的设计是在

tokio/src/runtime/thread_pool/worker.rs
1
2
3
4
5
6
7
8
9
10
11
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
// 让这个Worker的状态置于可以被窃取的状态
core.transition_from_searching(&self.worker);
}

fn transition_worker_from_searching(&self) {
if self.idle.transition_worker_from_searching() {
// 唤醒一个 Worker 让其窃取
self.notify_parked();
}
}

看到这里我们做一个全景性质的小结
1MTqk.png

Basic Scheduler

有了上面的 Threaded Scheduler,对于 Basic Scheduler 的理解就简单很多了。

Spawner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<Shared>,
}

impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.shared.schedule(task); // ➊ cx.tasks.borrow_mut().queue.push_back(task);
handle
}
}

对于 Spawner 来说蛮简单的就是直接将任务 PUSHTask Queue 即可。

EventLoop

对于 Basic Scheduler 逻辑理解起来更为的简单一些。

tokio/src/runtime/basic_scheduler.rs:run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
let _enter = runtime::enter(false);
let waker = waker_ref(&scheduler.spawner.shared);
let mut cx = std::task::Context::from_waker(&waker);
'outer:
loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}

// ➊ MAX_TASKS_PER_TICK 单个唤醒周期内处理多少任务
for _ in 0..MAX_TASKS_PER_TICK {
let tick = scheduler.tick;
scheduler.tick = scheduler.tick.wrapping_add(1); // 将 tick + 1

// ➋ REMOTE_FIRST_INTERVAL = 31,经过 31 轮之后检查 Spawner 队列,是 else 部分
let next = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler
.spawner
.pop()
.or_else(|| context.tasks.borrow_mut().queue.pop_front()) // 没有就去 全局队列 取
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.or_else(|| scheduler.spawner.pop())
};

// ➌ 获取到任务,就处理, 没有就直接 Park ,等待下次唤起
match next {
Some(task) => crate::coop::budget(|| task.run()),
None => {
// Park until the thread is signaled
scheduler.park.park().ok().expect("failed to park");

// Try polling the `block_on` future next
continue 'outer;
}
}
}

//没有任务直接直接 Park 了
scheduler
.park
.park_timeout(Duration::from_millis(0))
.ok()
.expect("failed to park");
}

对于 Basic Scheduler 来说唯一有些复杂的就是,在 SpwanBasicScheduler 中都会有任务,因为会涉及到在一定轮数下就会切换到 Spwan中,不过从实际的运行过程中,会发现我们几乎都是在 cx.tasks.borrow_mut().queue.push_back(task) 进行工作,因为每一次运行都涉及到 bind,不过值得注意的是 Spawner 是会在多个线程之间共享的。

Wake

对于 wake 就很简单了,将 unpark 对象进行 unpark 即可。

1
2
3
4
5
6
7
8
9
10
impl Wake for Shared {
fn wake(self: Arc<Self>) {
Wake::wake_by_ref(&self)
}

/// Wake by reference
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.unpark.unpark();
}
}

流程小结

正常使用 Spawner 会从 Context 也就是当前线程中创建出来一个 Spawner 进行任务提交的时候,并不会将 Task 放入 Spanwer 内置的队列中。

walqRH.png

线程模型

Epoll 之下的线程模型 中,我们也提及过,对于利用多核的能力,我们可以对 accept() 或者 read() 进行负载均衡,对于 Tokio 来说,我们实际上并没有像 Netty 那样限制了 BossGroupWorkGroupTokio 作为一个底层框架并没有限制这样的工作模式,比如我们就可以按照自己的想法进行定制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut boss = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(2)
.build()?;

let mut workers = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(4)
.build()?;


let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());

let mut listener = TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);

boss.spawn(async move {
loop {
let (mut socket, _) = listener.accept()
.await.expect("failed to accept from socket");
println!("accept thread: {:?}", thread::current());
workers.spawn(async move {
let mut buf = [0; 1024];

loop {
let n = socket.read(&mut buf).await.expect("failed to read data from socket");
break;
}
socket.shutdown(Shutdown::Write);
});
}
});

Ok(thread::park())
}

运行结果如下:

1
2
3
4
accept thread: Thread { id: ThreadId(14), name: Some("tokio-runtime-worker") }
accept thread: Thread { id: ThreadId(14), name: Some("tokio-runtime-worker") }
accept thread: Thread { id: ThreadId(15), name: Some("tokio-runtime-worker") }
accept thread: Thread { id: ThreadId(15), name: Some("tokio-runtime-worker") }

按照上面的代码我们就可以模拟出 Netty 的线程模型。

参考