Actix : Rust 中 Actor 模型的实现

1ZxkC.png

我们聊完了 Rust 的异步编程框架,我们下来看看更加抽象的 Actor 模型的实现。

本文基于 Actix-Core: 0.10.0

Actor 模型

关于什么是 Actor 模型,这里就不做展开了,可以参考 The actor model in 10 minutes 简而言之,我们使用消息传递信息,Actor 作为处理节点。

HGPu4.png

初窥 Actix

对于 Actix 来说我们去实现一个 Actor 非常的简单:

Example Ping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use std::thread;
use actix::prelude::*;

struct Ping(usize);

impl Message for Ping {
type Result = usize;
}

struct MyActor {
count: usize,
}

impl Actor for MyActor {
type Context = Context<Self>;
}

impl Handler<Ping> for MyActor {
type Result = usize;

fn handle(&mut self, msg: Ping, _: &mut Context<Self>) -> Self::Result {
println!("Handler in {:?}", thread::current().id());
self.count += msg.0;
self.count
}
}


fn main() {
let system = System::new("single-arbiter-example");
Arbiter::spawn(async {
// 启动一个 actor
let addr = MyActor { count: 10 }.start();

println!("Arbiter in {:?}", thread::current().id());

// 发送数据
let res = addr.send(Ping(10)).await;

println!("RESULT: {}", res.unwrap() == 20);

// 停止系统退出
System::current().stop();
});

system.run();
}

这里和官方的不太一样进行了改造,这样比较容易方便后面的代码拆解。

我们随意的在 handle 函数中设置一个断点,看一下函数栈,熟悉的味道马上就涌上心头。函数栈底还是上次分析的 Tokio,因此对于 Actix 来说,最终的 Runtime 依然是基于 Tokio 的运行模型的。从 Actor 模型的模型的抽象来说最重要的模型莫过于 Actor 了,我们来看看这个运行逻辑的承载者是如何实现的。

Actor Impl

Actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pub trait Actor: Sized + Unpin + 'static {
fn start(self) -> Addr<Self>
where
Self: Actor<Context = Context<Self>>,
{
Context::new().run(self) // 构建上下文对象运行
}
}

impl<A> Context<A>
where
A: Actor<Context = Self>,
{
#[inline]
pub fn run(self, act: A) -> Addr<A> {
let fut = self.into_future(act);
let addr = fut.address();
actix_rt::spawn(fut);
addr
}
}

从上文中,我们可以得到,对于将 Actor 转化为成一个 Funture 然后将其置于 Runtime 中进行处理,对于 actix_rt::spawn(fut) 处理大家可能会觉得比较复杂,其实是相对简单的。我们晚一点再过来看,我们继续看这个 into_future(act) 做了什么,其实也没做啥就是直接构建了 ContextFut 对象。

src/context.rs:111
1
2
3
4
5
6
7
8
9
10
11
pub struct ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
ctx: C,
act: A,
mailbox: Mailbox<A>,
wait: SmallVec<[ActorWaitItem<A>; 2]>,
items: SmallVec<[Item<A>; 3]>,
}

其中最重要的某过于 mailboxact 对象了,act -> actor 本身,而 mailbox 使我们后续接收到的数据的一个管道,而在 let addr = fut.address(); 返回的恰好就是 mailbox 的地址,那我们看看 mailbox 又是什么东西。

Mailbox AddressReceiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pub struct Mailbox<A>
where
A: Actor,
A::Context: AsyncContext<A>,
{
msgs: AddressReceiver<A>,
}

impl<A: Actor> AddressSender<A> {
pub fn send<M>(&self, msg: M) -> Result<Receiver<M::Result>, SendError<M>> {}

pub fn try_send<M>(&self, msg: M, park: bool) -> Result<(), SendError<M>> {}

pub fn do_send<M>(&self, msg: M) -> Result<(), SendError<M>> {}

fn queue_push_and_signal(&self, msg: Envelope<A>) {}

fn park(&self) {}

fn poll_unparked(
&self,
do_park: bool,
cx: Option<&mut task::Context<'_>>,
) -> Poll<()> {}
}

对于 Mailbox 只是 AddressReceiver 一个别名罢了,而 AddressReceiver 是什么,我们从 Impl 的函数签名中大概也就是能够明白了,提供一个可供外部发送数据的队列(src/address/channel.rs:Struct Inner)。此时我们对系统有个一个感性的认知:

Hd64E.png

我们此时知道了 mailbox 理论上承担了通讯的职能,那我们看看怎么工作的。

Mailbox

Send Message

发送的逻辑如下

AddressReceiver#Send
1
2
3
4
5
6
7
8
9
10
11
12
pub fn send<M>(&self, msg: M) -> Result<Receiver<M::Result>, SendError<M>>
where
A: Handler<M>,
A::Context: ToEnvelope<A, M>,
M::Result: Send,
M: Message + Send,
{
let (tx, rx) = sync_channel(); // 拿到 channel 对象(Sender 和 Receiver)
let env = <A::Context as ToEnvelope<A, M>>::pack(msg, Some(tx)); // 创建一个 ENV 对象从 Tx 和 Msg 构建
self.queue_push_and_signal(env); // 将对象放置于 mailbox 队列之中
Ok(rx)
}

发送的逻辑也相对简单,在数据放置于 queue 之后,还做了一个事情 self.signal();

AddressSender#singal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn signal(&self) {
let task = {
let mut recv_task = self.inner.recv_task.lock();
if recv_task.unparked {
return;
}
recv_task.unparked = true;
recv_task.task.take()
}; //获取到队列中的任务

if let Some(task) = task {
task.wake(); //将其唤醒
}
}

我们再次看到了我们熟悉的 wake,对于 Funture 的异步编程来说,我们需要做的事情,显然是在阻塞的过程中,将我们的任务重新唤醒。

Recv Message

刚刚我们瞧完了 Send 我们下面肯定看看 Recv 了。这段逻辑就相对复杂一些,显然这里涉及到我们任务的 Park 操作。

mailbox#poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_>) {
loop {
match self.msgs.poll_next_unpin(task) { //去 AddressReceiver 中获取数据
Poll::Ready(Some(mut msg)) => {
not_ready = false;
msg.handle(act, ctx); // 调用我们声明的 actor 的 handle 函数处理此消息
}
Poll::Ready(None) | Poll::Pending => break,
}
}
}

// 核心逻辑还是在 AddressReceiver 中
impl<A: Actor> Stream for AddressReceiver<A> {
type Item = Envelope<A>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let msg = match this.next_message() { //去inner.message_queue.pop() 一个消息
Poll::Ready(msg) => msg,
Poll::Pending => {
// 如果没有数据,就尝试将此 cx 休眠掉,不过这里只是标记下状态,并非真正的去休眠线程
match this.try_park(cx) {
TryPark::Parked => {
return Poll::Pending;
}
TryPark::NotEmpty => {
continue;
}
}
}
};

return Poll::Ready(msg);
}
}
}

对于接受消息和发送消息都是内置的队列中获取数据即可,还是比较简单的。

How it works

那他们又是怎么串联在一起的呢?这个答案其实在最初的逻辑图中已经有所表达。我们将所有的内容放置于 Context 中,而创建出来的对象全称是 ContextFut 本身也是一个 Future对象,而转动齿轮的枢纽就在于此。在此之前我们先考虑一个场景,如果我们需要在 handle_message 中处理另外一个 anysc task呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
impl Handler<Ping> for MyActor {
type Result = usize;

fn handle(&mut self, msg: Ping, context: &mut Context<Self>) -> Self::Result {
context.spawn(async {
//do other future task
}.into_actor());

println!("Handler in {:?}", thread::current().id());
self.count += msg.0;
self.count
}
}

对于上面一种场景,也是比较正常的,对于这样的逻辑,我们想要在 Future 嵌套的话,必须通过 context.spawn() 调用,而在此处进行创建的 async task 都会作为 items 数组中的一员。到这里我们再来看看 ContextFut

ContextFut#poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
impl<A, C> Future for ContextFut<A, C>
where
C: AsyncContextParts<A> + Unpin,
A: Actor<Context = C>,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

'outer: loop {
// 处理接收到的消息,除非进入 stopping 状态 或者 wait 状态不为空
this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
if !this.wait.is_empty() && !this.stopping() {
continue;
}

// 这里处理在 Handle 中的 async 任务
let mut idx = 0;
while idx < this.items.len() && !this.stopping() {
this.ctx.parts().handles[1] = this.items[idx].0;
match Pin::new(&mut this.items[idx].1).poll(
&mut this.act,
&mut this.ctx,
cx,
) {
Poll::Pending => {
// 如果任务依然是 Pending
if !this.wait.is_empty() && !this.stopping() {
// 移动到队列的尾部,保证调度的公平性
let next = this.items.len() - 1;
if idx != next {
this.items.swap(idx, next);
}
continue 'outer;
} else {
idx += 1;
}
}
Poll::Ready(()) => {
this.items.swap_remove(idx);
// 继续工作
if !this.wait.is_empty() && !this.stopping() {
continue 'outer;
}
}
}
}

return Poll::Pending;
}
}
}

看到这里我们差不多又有了更多的对于 Actor 感官认知。

H6Gbf.png

Runtime

为了支撑上层能够完成这样的功能,我们需要一个 Runtime 进行支撑,对于我们来说,我们在 Demo 中发现,我们一个 Actor 就会创建一个 Context,而一个 Context 如果就要使用一个 Thread 进行调度,那成本太过于高了。使用 Tokio 的也有多种 Runtime 可选,因此对于 Actix 实现来说,在此之上实现了一个轻量级的 Runtime

actix-rt/src/runtime.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl Runtime {
pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new()
.enable_io()
.enable_time()
.basic_scheduler()
.build()?;

Ok(Runtime {
rt,
local: LocalSet::new(),
})
}
}

对于 Actix 构建使用的是 basic_scheduler,默认的 Runtime 只会跑在此时构建运行的线程上,不过对于这个 RuntimeActix 还搭配了一个 Arbiter 对象。

Arbiter

Arbiter 做什么用,主要有两点左右: 1. 用来停止 Runtime 2. 用来 Spwan Task

Stop Runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
impl Future for ArbiterController {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => { // 如果是 Stop 就向 停止队列发送数据
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => { // 如果是任务就去调度任务
let len = PENDING.with(move |cell| { //去Pending队列获取
let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(fut));
p.len()
});

}
},
Poll::Pending => return Poll::Pending,
}
}
}
}

Spwan task

对于 Tokio runtimespwan 并没有向 actix 的用户开放,因此想要去创建新任务,需要调用 Arbiterspawn

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn spawn<F>(future: F)
where F: Future<Output = ()> + 'static {
RUNNING.with(move |cell| {
if cell.get() {
// 创建任务置于 queue 之中
let len = PENDING.with(move |cell| {
let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(future));
p.len()
});
}
});
}

小结

本文意在给大家分析最小的 Actix 的运行机制,除本文之外 Actor 模型里面还有非常重要的 Supervisor 来维护 Actor 的生命周期尚未涉及,并且 Actix-core 意在提供的是单机的 Actor 的模型并没有像 Akka 这等全面,因为代码也相对较少较好理解。

Have funny in rust proggamming

参考