我们聊完了 Rust
的异步编程框架,我们下来看看更加抽象的 Actor
模型的实现。
本文基于 Actix-Core: 0.10.0
Actor 模型
关于什么是 Actor
模型,这里就不做展开了,可以参考 The actor model in 10 minutes 简而言之,我们使用消息传递信息,Actor
作为处理节点。
初窥 Actix
对于 Actix
来说我们去实现一个 Actor
非常的简单:
Example Ping 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 use std::thread;use actix::prelude::*;struct Ping (usize );impl Message for Ping { type Result = usize ; } struct MyActor { count: usize , } impl Actor for MyActor { type Context = Context<Self >; } impl Handler <Ping> for MyActor { type Result = usize ; fn handle (&mut self , msg: Ping, _: &mut Context<Self >) -> Self ::Result { println! ("Handler in {:?}" , thread::current ().id ()); self .count += msg.0 ; self .count } } fn main () { let system = System::new ("single-arbiter-example" ); Arbiter::spawn (async { let addr = MyActor { count: 10 }.start (); println! ("Arbiter in {:?}" , thread::current ().id ()); let res = addr.send (Ping (10 )).await ; println! ("RESULT: {}" , res.unwrap () == 20 ); System::current ().stop (); }); system.run (); }
作者语
这里和官方的不太一样进行了改造,这样比较容易方便后面的代码拆解。
我们随意的在 handle
函数中设置一个断点,看一下函数栈,熟悉的味道马上就涌上心头。函数栈底还是上次分析的 Tokio
,因此对于 Actix
来说,最终的 Runtime
依然是基于 Tokio
的运行模型的。从 Actor
模型的模型的抽象来说最重要的模型莫过于 Actor
了,我们来看看这个运行逻辑的承载者是如何实现的。
Actor Impl
Actor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 pub trait Actor : Sized + Unpin + 'static { fn start (self ) -> Addr<Self > where Self : Actor<Context = Context<Self >>, { Context::new ().run (self ) } } impl <A> Context<A>where A: Actor<Context = Self >, { #[inline] pub fn run (self , act: A) -> Addr<A> { let fut = self .into_future (act); let addr = fut.address (); actix_rt::spawn (fut); addr } }
从上文中,我们可以得到,对于将 Actor
转化为成一个 Funture
然后将其置于 Runtime
中进行处理,对于 actix_rt::spawn(fut)
处理大家可能会觉得比较复杂,其实是相对简单的。我们晚一点再过来看,我们继续看这个 into_future(act)
做了什么,其实也没做啥就是直接构建了 ContextFut
对象。
src/context.rs:111 1 2 3 4 5 6 7 8 9 10 11 pub struct ContextFut <A, C>where C: AsyncContextParts<A> + Unpin, A: Actor<Context = C>, { ctx: C, act: A, mailbox: Mailbox<A>, wait: SmallVec<[ActorWaitItem<A>; 2 ]>, items: SmallVec<[Item<A>; 3 ]>, }
其中最重要的某过于 mailbox
和 act
对象了,act
-> actor
本身,而 mailbox
使我们后续接收到的数据的一个管道,而在 let addr = fut.address();
返回的恰好就是 mailbox
的地址,那我们看看 mailbox
又是什么东西。
Mailbox AddressReceiver 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 pub struct Mailbox <A>where A: Actor, A::Context: AsyncContext<A>, { msgs: AddressReceiver<A>, } impl <A: Actor> AddressSender<A> { pub fn send <M>(&self , msg: M) -> Result <Receiver<M::Result >, SendError<M>> {} pub fn try_send <M>(&self , msg: M, park: bool ) -> Result <(), SendError<M>> {} pub fn do_send <M>(&self , msg: M) -> Result <(), SendError<M>> {} fn queue_push_and_signal (&self , msg: Envelope<A>) {} fn park (&self ) {} fn poll_unparked ( &self , do_park: bool , cx: Option <&mut task::Context<'_ >>, ) -> Poll<()> {} }
对于 Mailbox
只是 AddressReceiver
一个别名罢了,而 AddressReceiver
是什么,我们从 Impl
的函数签名中大概也就是能够明白了,提供一个可供外部发送数据的队列(src/address/channel.rs:Struct Inner)。此时我们对系统有个一个感性的认知:
我们此时知道了 mailbox
理论上承担了通讯的职能,那我们看看怎么工作的。
Mailbox
Send Message
发送的逻辑如下
AddressReceiver#Send 1 2 3 4 5 6 7 8 9 10 11 12 pub fn send <M>(&self , msg: M) -> Result <Receiver<M::Result >, SendError<M>>where A: Handler<M>, A::Context: ToEnvelope<A, M>, M::Result : Send , M: Message + Send , { let (tx, rx) = sync_channel (); let env = <A::Context as ToEnvelope<A, M>>::pack (msg, Some (tx)); self .queue_push_and_signal (env); Ok (rx) }
发送的逻辑也相对简单,在数据放置于 queue
之后,还做了一个事情 self.signal();
AddressSender#singal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 fn signal (&self ) { let task = { let mut recv_task = self .inner.recv_task.lock (); if recv_task.unparked { return ; } recv_task.unparked = true ; recv_task.task.take () }; if let Some (task) = task { task.wake (); } }
我们再次看到了我们熟悉的 wake
,对于 Funture
的异步编程来说,我们需要做的事情,显然是在阻塞的过程中,将我们的任务重新唤醒。
Recv Message
刚刚我们瞧完了 Send
我们下面肯定看看 Recv
了。这段逻辑就相对复杂一些,显然这里涉及到我们任务的 Park
操作。
mailbox#poll 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 pub fn poll (&mut self , act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_ >) { loop { match self .msgs.poll_next_unpin (task) { Poll::Ready (Some (mut msg)) => { not_ready = false ; msg.handle (act, ctx); } Poll::Ready (None ) | Poll::Pending => break , } } } impl <A: Actor> Stream for AddressReceiver <A> { type Item = Envelope<A>; fn poll_next (self : Pin<&mut Self >, cx: &mut task::Context<'_ >) -> Poll<Option <Self ::Item>> { let this = self .get_mut (); loop { let msg = match this.next_message () { Poll::Ready (msg) => msg, Poll::Pending => { match this.try_park (cx) { TryPark::Parked => { return Poll::Pending; } TryPark::NotEmpty => { continue ; } } } }; return Poll::Ready (msg); } } }
对于接受消息和发送消息都是内置的队列中获取数据即可,还是比较简单的。
How it works
那他们又是怎么串联在一起的呢?这个答案其实在最初的逻辑图中已经有所表达。我们将所有的内容放置于 Context
中,而创建出来的对象全称是 ContextFut
本身也是一个 Future
对象,而转动齿轮的枢纽就在于此。在此之前我们先考虑一个场景,如果我们需要在 handle_message
中处理另外一个 anysc task
呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 impl Handler <Ping> for MyActor { type Result = usize ; fn handle (&mut self , msg: Ping, context: &mut Context<Self >) -> Self ::Result { context.spawn (async { }.into_actor ()); println! ("Handler in {:?}" , thread::current ().id ()); self .count += msg.0 ; self .count } }
对于上面一种场景,也是比较正常的,对于这样的逻辑,我们想要在 Future
嵌套的话,必须通过 context.spawn()
调用,而在此处进行创建的 async task
都会作为 items
数组中的一员。到这里我们再来看看 ContextFut
ContextFut#poll 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 impl <A, C> Future for ContextFut <A, C>where C: AsyncContextParts<A> + Unpin, A: Actor<Context = C>, { type Output = (); fn poll (self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<Self ::Output> { let this = self .get_mut (); 'outer : loop { this.mailbox.poll (&mut this.act, &mut this.ctx, cx); if !this.wait.is_empty () && !this.stopping () { continue ; } let mut idx = 0 ; while idx < this.items.len () && !this.stopping () { this.ctx.parts ().handles[1 ] = this.items[idx].0 ; match Pin::new (&mut this.items[idx].1 ).poll ( &mut this.act, &mut this.ctx, cx, ) { Poll::Pending => { if !this.wait.is_empty () && !this.stopping () { let next = this.items.len () - 1 ; if idx != next { this.items.swap (idx, next); } continue 'outer ; } else { idx += 1 ; } } Poll::Ready (()) => { this.items.swap_remove (idx); if !this.wait.is_empty () && !this.stopping () { continue 'outer ; } } } } return Poll::Pending; } } }
看到这里我们差不多又有了更多的对于 Actor
感官认知。
Runtime
为了支撑上层能够完成这样的功能,我们需要一个 Runtime
进行支撑,对于我们来说,我们在 Demo
中发现,我们一个 Actor
就会创建一个 Context
,而一个 Context
如果就要使用一个 Thread
进行调度,那成本太过于高了。使用 Tokio
的也有多种 Runtime
可选,因此对于 Actix
实现来说,在此之上实现了一个轻量级的 Runtime
actix-rt/src/runtime.rs 1 2 3 4 5 6 7 8 9 10 11 12 13 14 impl Runtime { pub fn new () -> io::Result <Runtime> { let rt = runtime::Builder::new () .enable_io () .enable_time () .basic_scheduler () .build ()?; Ok (Runtime { rt, local: LocalSet::new (), }) } }
对于 Actix
构建使用的是 basic_scheduler
,默认的 Runtime
只会跑在此时构建运行的线程上,不过对于这个 Runtime
,Actix
还搭配了一个 Arbiter
对象。
Arbiter
Arbiter
做什么用,主要有两点左右: 1. 用来停止 Runtime
2. 用来 Spwan Task
Stop Runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 impl Future for ArbiterController { type Output = (); fn poll (mut self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<Self ::Output> { loop { match Pin::new (&mut self .rx).poll_next (cx) { Poll::Ready (None ) => return Poll::Ready (()), Poll::Ready (Some (item)) => match item { ArbiterCommand::Stop => { if let Some (stop) = self .stop.take () { let _ = stop.send (0 ); }; return Poll::Ready (()); } ArbiterCommand::Execute (fut) => { let len = PENDING.with (move |cell| { let mut p = cell.borrow_mut (); p.push (tokio::task::spawn_local (fut)); p.len () }); } }, Poll::Pending => return Poll::Pending, } } } }
Spwan task
对于 Tokio runtime
的 spwan
并没有向 actix
的用户开放,因此想要去创建新任务,需要调用 Arbiter
的 spawn
1 2 3 4 5 6 7 8 9 10 11 12 13 pub fn spawn <F>(future: F)where F: Future<Output = ()> + 'static { RUNNING.with (move |cell| { if cell.get () { let len = PENDING.with (move |cell| { let mut p = cell.borrow_mut (); p.push (tokio::task::spawn_local (future)); p.len () }); } }); }
小结
本文意在给大家分析最小的 Actix
的运行机制,除本文之外 Actor
模型里面还有非常重要的 Supervisor
来维护 Actor
的生命周期尚未涉及,并且 Actix-core
意在提供的是单机的 Actor
的模型并没有像 Akka
这等全面,因为代码也相对较少较好理解。
Have funny in rust proggamming
参考