Rust Dynamic Dispatching

torLS.png

举个例子

假设我们有如下定义的一组对象。

1
2
3
4
5
6
7
8
9
trait Backend {}

struct TypeA {}

struct TypeB {}

impl Backend for TypeA {}

impl Backend for TypeB {}

我们需要构建一个 Services 列表,现在是很常规的操作。

1
2
3
struct Service <T: Backend> {
backend: T
}

我们具型化的时候会发现一个问题

1
2
3
let mut backends = Vec::new();
backends.push(TypeA);
backends.push(TypeB); // <---- 类型错误

我们如果想要实现这样的机制需要这样操作

1
2
3
4
5
6
7
struct Service{
backends: Vec<Box<dyn Backend>>
}
...
let mut backends = Vec::new();
backends.push( Box::new(PositiveBackend{}) as Box<dyn Backend>);
backends.push( Box::new(NegativeBackend{}) as Box<dyn Backend>);

上诉的操作就称之为 “Dynamic Dispatching” 动态分发,我们在调用 Backend 之前并不知道他的类型。这也是实现多态的必要条件,不过这样的操作在运行时会有性能损失,为什么会有性能损耗,我们还是来探索一下。

基线测试

我们先看看静态分发的性能损耗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use std::time::SystemTime;

struct PositiveBackend;

impl PositiveBackend {
fn compute(&self, number: u64) -> u64 {
number + 1
}
}

fn main() {
let backend = Box::new(PositiveBackend);
let mut res = 0 as u64;
let start_time = SystemTime::now();
let total = 20_000_000 as u64;

// our main loop
for i in 0..total {
res += backend.compute(i);
}

println!("Result: {}", res);
println!("Elapsed_ms: {}", start_time.elapsed().unwrap().as_millis());
}

在笔者的机器上的执行结果几乎不耗时(记得打开release)

debug
1
2
3
4
5
6
➜  learning git:(master) ✗ cargo run --release
Finished release [optimized] target(s) in 0.02s
Running `target/release/learning`

Result: 200000010000000
Elapsed_ms: 0

静态分发

为什么静态分发这么快

我们打开 Hopper Disassembler 对二进制文件进行分析。

1
2
3
4
5
6
7
8
9
10
call       __ZN3std4time10SystemTime3now17h76cefb4014446fb3E ; System.now() 获得当前时间
mov qword [rbp+var_60], rax
mov qword [rbp+var_58], rdx
movabs rax, 0xb5e6218d1680 ; 0xb5e6218d1680 -> 200000010000000 直接是计算好的结果
mov qword [rbp+var_50], rax
lea rax, qword [rbp+var_50]
mov qword [rbp+var_18], rax
lea rax, qword [__ZN4core3fmt3num3imp52_$LT$impl$u20$core..fmt..Display$u20$for$u20$u64$GT$3fmt17hb00db9d5e1e54df6E]
mov qword [rbp+var_10], rax
lea rax, qword [_anon.c8e953eae41e214c40f6f056fcde3153.0+80]

对于编译器来说,在静态器已经知道的数据类型,和计算都可直接通过静态分析得到结果,而不用在运行时真正的再算一遍。

Debug 为什么又这么慢

我们如果运行在 Debug 模式下,结果又是另外一番。

1
2
3
4
5
6
➜  learning git:(master) ✗ cargo run               
Compiling learning v0.1.0 (/Users/yann/Codes/rust/learning)
Finished dev [unoptimized + debuginfo] target(s) in 0.57s
Running `target/debug/learning`
Result: 200000010000000
Elapsed_ms: 741

还是打开我们的反编译器看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
loc_100003f2c: ; PositiveBackend:Compute 子函数
0000000100003f49 call __ZN8learning15PositiveBackend7compute17hbdcd6a613a6654c0E ; PositiveBackend 计算函数的调用
0000000100003f4e mov qword [rbp+var_1A8], rax
0000000100003f55 jmp loc_100003f57

loc_100003e20: ; For 循环子函数
0000000100003e68 call __ZN63_$LT$I$u20$as$u20$core..iter..traits..collect..IntoIterator$GT$9into_iter17h5ff7777380a60e23E ; _$LT$I$u20$as$u20$core..iter..traits..collect..IntoIterator$GT$::into_iter::h5ff7777380a60e23
0000000100003e6d mov qword [rbp+var_170], rdx
0000000100003e74 mov qword [rbp+var_178], rax
0000000100003e7b jmp loc_100003e7d


loc_100003eb5: 比较 total 是否满足
0000000100003eb5 mov rax, qword [rbp+var_188] ; CODE XREF=__ZN8learning4main17h9a9a004d32cb7481E+211
0000000100003ebc mov qword [rbp+var_118], rax
0000000100003ec3 mov rcx, qword [rbp+var_180]
0000000100003eca mov qword [rbp+var_110], rcx
0000000100003ed1 mov rdx, qword [rbp+var_118]
0000000100003ed8 test rdx, rdx
0000000100003edb je loc_100003ee1

对于 Debug 模式下,我们需要进循环体,因此我们将函数拆分了多段,并且通过 call/je 将程序组装起来,显然就是比直接结果 mov 进来要来的慢的多。

动态分发

我们将代码修改为如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use std::env;
use std::time::SystemTime;

trait Backend{
fn compute(&self,number: i64) -> i64;
}

struct PositiveBackend;
struct NegativeBackend;

impl Backend for PositiveBackend{
#[inline(never)]
fn compute(&self,number: i64) -> i64{
number+1
}
}

impl Backend for NegativeBackend{
#[inline(never)]
fn compute(&self,number: i64) -> i64{
number-1
}
}

fn main() {
let backend = match env::args().skip(1).next() {
Some(x) => Box::new(PositiveBackend ) as Box<dyn Backend>,
_ => Box::new(NegativeBackend) as Box<dyn Backend>
};

let mut res= 0 as i64;

let start_time = SystemTime::now();

let total = 20_000_000 as i64;

for i in 0 .. total {
res += backend.compute(i);
}

println!("Result: {}",res);
println!("Elapsed_ms: {}", start_time.elapsed().unwrap().as_millis());

}

执行结果如下

1
2
3
4
    Finished release [optimized] target(s) in 0.03s
Running `target/release/learning 1`
Result: -20000002
Elapsed_ms: 46

这个时间和我们的 Debug 模式都差不多了。

动态分发的实现

对于大多数的编程语言 Rust C++ 为了实现动态分发(多态的需求),都采用了一种叫 vtable 的技术。

toD44.png

对于 Box<dyn Trait> 我们至少需要 2Vtable 这里我们就不看汇编了,我们看看 汇编 生成的伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
loc_100004aab:
r12 = *(r15 + 0x18);
r13 = 0x0;
rbx = 0x0;
do {
rax = (r12)(0x1, rbx);
rbx = rbx + 0x1;
r13 = rax + r13 * 0x2;
var_78 = r13;
} while (rbx != 0x1312d00);
std::io::stdio::_print::hdea2154009b9e479(0x1000300b0);
std::time::SystemTime::elapsed::hd316930bad90c880(0x1000300b0, var_98);
return rax;

现在这次编译出来的结果并没有直接优化成一个值,而是真的去 loop0x1312d00 去计算,那是为什么呢?

答案和代码一体两证的可以发现。 (r12)(0x1, rbx) 是一个函数,而这 r12 就是我们动态代理的核心。

打开反汇编,很容易找到如下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0000000100004adc         mov        qword [rbp+var_98], rax
0000000100004ae3 mov qword [rbp+var_90], rdx
0000000100004aea mov r12, qword [r15+0x18]
0000000100004aee xor r13d, r13d
0000000100004af1 xor ebx, ebx
0000000100004af3 nop word [cs:rax+rax]
0000000100004afd nop dword [rax]

loc_100004b00: 循环计算子函数
0000000100004b00 mov edi, 0x1 ; 立即数 1
0000000100004b05 mov rsi, rbx
0000000100004b08 call r12 ; Backend:Compute 函数
0000000100004b0b inc rbx
0000000100004b0e add r13, rax
0000000100004b11 mov qword [rbp+var_78], r13
0000000100004b15 cmp rbx, 0x1312d00 ; 20000000 次循环
0000000100004b1c jne loc_100004b00

我们可以发现的核心的恰好就是 r12 这个寄存器中的地址,而如何判断具体应该调用哪一个呢?r12r15 + 0x18 的固定片偏移而来。

为什么是 0x18?因为 Vtable 的第一部分是一个指向析构函数的 Pointer8bytes,然后是两个 64bit 的数字是 vtable 的大小和对齐。然后才是我们的函数指向,因为我们只有一个函数,也就是 index:0,那刚好就是 8 + 8 * 2 = 24 = 0x18

r15 是什么?我们从上面可以得知。

1
2
3
4
5
loc_1000049ec:
00000001000049f7 lea r15, qword [_anon.c8e953eae41e214c40f6f056fcde3153.0+80]

loc_100004a14:
0000000100004a1f lea r15, qword [_anon.c8e953eae41e214c40f6f056fcde3153.0+112]

我们有2个子函数进行处理,究竟是 80 的偏移量还是 112 呢?

1
2
3
4
5
6
7
8
9
00000001000049a2         call       __ZN73_$LT$std..env..Args$u20$as$u20$core..iter..traits..iterator..Iterator$GT$4next17h92f309c39800957bE ; 获得参数
00000001000049a7 mov rdi, qword [rbp+var_70]
00000001000049ab test rdi, rdi ; 比较,也就是 match env::args().skip(1).next()
00000001000049ae je loc_1000049ec ; goto +80 偏移量

loc_1000049c0:
00000001000049c0 mov rsi, qword [rbp+var_68]
00000001000049c4 inc rbx
00000001000049c7 je loc_100004a14 ; goto +112 偏移量

t49aL.png
从调用流程上也很容易分析出来。

动态分发的消耗

根据 测试的看来

1
2
3
4
Iterating over 20M elements:
----------------------------
static dispatch: 64 ms
dynamic dispatch: 216 ms

动态分发还是会有不小的衰耗。