How Merbridge Works

服务网格加速器 Merbridge 正式进入 CNCF 沙箱 中提及使用 Cillum 来提速 Istio,我们来瞧瞧这个原理是如何的。

本文基于 0.8.1 版本

Istio iptable 流量路径

在传统的 Istio 流量路径中,outbound 流量由 APP 出来被 iptables 劫持,然后通过内核转发给 Envoy,Envoy 再出去。Inbound 类似,因此实际上数据包在内核中倒腾了好几次

优化手段

既然是 ebpf 的手段,那肯定是 [译] 利用 ebpf sockmap/redirection 提升 socket 性能(2020)
目标是绕过内核的 TCP/IP 协议栈

How Merbridge Works

iptables 本身使用 DNAT 功能做流量转发
这里主要有两个要点:

  1. 修改连接发起时的目的地址,让流量能够发送到新的接口;
  2. 让 Envoy 能识别原始的目的地址,以能够识别流量。

想要用 eBPF 模拟 iptables 的能力,就需要使用 eBPF 实现类似 iptables DNAT 的能力。

  1. 使用 eBPF 的 connect 程序修改 user_ip 和 user_port 实现。
  2. 需要用到 ORIGINAL_DST 的概念,这在 Linux 内核中是 netfilter 模块专属的。

其原理为:应用程序在收到连接之后调用 get_sockopts 函数,获取 ORIGINAL_DST。如果经过了 iptables 的 DNAT,那么 iptables 就会给当前的 socket 设置 ORIGINAL_DST 这个值,并把原有的 IP + 端口写入这个值,应用程序就可以根据连接拿到原有的目的地址。

PS: 值得注意的 eBPF 运行在内核态,因此不能想象其运行在 cgroup 的限制的虚拟空间中

Outbound 流量

早期版本

在早期的版本中,采用了一个比较巧妙的方式来做

  1. 在应用向外发起连接时,connect 程序会将目标地址修改为 127.x.y.z:15001,并用 cookie_original_dst 保存原始目的地址。
  2. 在 sockops 程序中,将当前 sock 和四元组保存在 sock_pair_map 中。同时,将四元组信息和对应的原始目的地址写入 pair_original_dst 中 (之所以不用 cookie,是因为 get_sockopt 函数无法获取当前 cookie)。
  3. Envoy 收到连接之后会调用 getsockopt 获取当前连接的目的地址,get_sockopt 函数会根据四元组信息从 pair_original_dst 取出原始目的地址并返回,由此完全建立连接。
  4. 在发送数据阶段,redir 程序会根据四元组信息,从 sock_pair_map 中读取 sock,然后通过 bpf_msg_redirect_hash 进行直接转发,加速请求。

其中,之所以在 connect 时,修改目的地址为 127.x.y.z 而不是 127.0.0.1,是因为在不同的 Pod 中,可能产生冲突的四元组,使用此方式即可巧妙地避开冲突 (每个 Pod 间的目的 IP 不同,不会出现冲突的情况)。

source code

1
2
3
4
5
6
7
8
9
__u64 uid = bpf_get_current_uid_gid() & 0xffffffff;
__u32 dst_ip = ctx->user_ip4;

// 这里的 outip 就是一个自增变量,来区分在同一个宿主机上不同命名空间内的来自相同 localhost 的请求
ctx->user_ip4 = bpf_htonl(0x7f800000 | (outip++));
if (outip >> 20) {
outip = 1;
}
ctx->user_port = bpf_htons(OUT_REDIRECT_PORT);

当前版本 (>0.7.2)

当前版本采用了 CNI 模式,较为复杂,但是会更加全面。

大部分 CNI 就是在节点上运行一个 daemonsetsPOD 基于 Network NS 做一些工作,这里也一样,当我们安装完成 merbridge,我们就能看到对应的节点 POD,

1
2
3
4
5
6
7
$ kubectl get pod -n istio-system
NAME READY STATUS RESTARTS AGE
istio-ingressgateway-676d4bf8b7-24257 1/1 Running 0 57d
istiod-5fb4fb7f6b-5pzbs 1/1 Running 0 57d
merbridge-c4jj9 1/1 Running 0 14h
merbridge-jldmx 1/1 Running 0 14h
merbridge-x65x6 1/1 Running 0 14h

当然还有 CLI 命令,代码在这里

启动逻辑在 RunLocalPodController 这里不做展开,
这里主要的工作就是维护当前 NODE 节点上的 POD IP 列表。

下面就一个新增 POD 的流程来看看。

POD 初始化
  1. 节点上的 membrige 会更新本Node节点的 IP 清单
新建 PODgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
func addFunc(obj interface{}) {
pod, ok := obj.(*v1.Pod)
log.Debugf("got pod updated %s/%s", pod.Namespace, pod.Name)

_ip, _ := linux.IP2Linux(pod.Status.PodIP)
log.Infof("update local_pod_ips with ip: %s", pod.Status.PodIP)

// 更新 ebpf 中的 localIPMAP 对象
err := ebpfs.GetLocalIPMap().Update(_ip, &p, ebpf.UpdateAny)
if err != nil {
log.Errorf("update local_pod_ips %s error: %v", pod.Status.PodIP, err)
}
}

通过 bpftool 工具很方便可以查看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ bpftool map show
29: lru_hash name cookie_original flags 0x0
key 8B value 24B max_entries 65535 memlock 2097152B
30: hash name local_pod_ips flags 0x0
key 16B value 484B max_entries 1024 memlock 516096B
31: lru_hash name process_ip flags 0x0
key 4B value 4B max_entries 1024 memlock 8192B
32: lru_hash name cgroup_info_map flags 0x0
key 8B value 32B max_entries 1024 memlock 40960B
33: hash name mark_pod_ips_ma flags 0x0
key 4B value 16B max_entries 65535 memlock 1572864B
37: array name mb_conne.data flags 0x0
key 4B value 4B max_entries 1 memlock 4096B
btf_id 135
38: lru_hash name pair_original_d flags 0x0
key 36B value 24B max_entries 65535 memlock 4194304B
39: sockhash name sock_pair_map flags 0x0
key 36B value 4B max_entries 65535 memlock 2621440B
70: hash name calico_failsafe flags 0x1
key 4B value 1B max_entries 65535 memlock 524288B
  1. CNI 会将当前 POD IP 植入命令空间
CNI-CMDADDgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (s *server) listenConfig(addr net.Addr, netns string) net.ListenConfig {
return net.ListenConfig{
Control: func(network, address string, conn syscall.RawConn) error {
var operr error
if err := conn.Control(func(fd uintptr) {
// 创建一个 mark_pod_ips_map 在 epbf 中
m, err := ebpf.LoadPinnedMap(path.Join(s.bpfMountPath, "mark_pod_ips_map"), &ebpf.LoadPinOptions{})
if err != nil {
operr = err
return
}
var ip unsafe.Pointer
switch v := addr.(type) { // todo instead of hash
case *net.IPNet: // nolint: typecheck
ip, err = linux.IP2Linux(v.IP.String())
case *net.IPAddr: // nolint: typecheck
ip, err = linux.IP2Linux(v.String())
}
if err != nil {
operr = err
return
}
key := getMarkKeyOfNetns(netns)
// 将 IP 更换新到其中
operr = m.Update(key, ip, ebpf.UpdateAny)
if operr != nil {
return
}
operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, ns.SoMark, int(key))
}); err != nil {
return err
}
return operr
},
}
}
Outbound 处理
Connect 阶段
  1. 在应用向外发起连接时,connect 程序会先去读取下 cgroup_info,在 CNI 模式中,会获取当前的 POD IP源码地址
  2. 通过 cookie 保存 cookie_original_dst 信息,这个在后续 Envoy 中的 get_sockopt 函数需要使用到。
  3. 然后将 bpf_bind 把 ctx 和这个 cgroup 内的 socket 绑定,后续可以获取 ctx 在后续的执行
tcp4connectgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
static inline int tcp_connect4(struct bpf_sock_addr *ctx)
{

struct cgroup_info cg_info;
if (!get_current_cgroup_info(ctx, &cg_info)) {
return 1;
}

__u32 curr_pod_ip;
__u32 _curr_pod_ip[4];
set_ipv6(_curr_pod_ip, cg_info.cgroup_ip);
curr_pod_ip = get_ipv4(_curr_pod_ip);

__u64 uid = bpf_get_current_uid_gid() & 0xffffffff;
__u32 dst_ip = ctx->user_ip4;
if (uid != SIDECAR_USER_ID) {
if ((dst_ip & 0xff) == 0x7f) {
// app call local, bypass.
return 1;
}
__u64 cookie = bpf_get_socket_cookie_addr(ctx);
// app call others
debugf("call from user container: cookie: %d, ip: %pI4, port: %d",
cookie, &dst_ip, bpf_htons(ctx->user_port));

// we need redirect it to envoy.
struct origin_info origin;
memset(&origin, 0, sizeof(origin));
set_ipv4(origin.ip, dst_ip);
origin.port = ctx->user_port;
origin.flags = 1;
if (bpf_map_update_elem(&cookie_original_dst, &cookie, &origin,
BPF_ANY)) {
printk("write cookie_original_dst failed");
return 0;
}
if (curr_pod_ip) {
// 一些检查 SKIP
// 把这个 ctx 和 socket 绑定
struct sockaddr_in addr = {
.sin_addr =
{
.s_addr = curr_pod_ip,
},
.sin_port = 0,
.sin_family = 2,
};
if (bpf_bind(ctx, &addr, sizeof(struct sockaddr_in))) {
debugf("bind %pI4 error", &curr_pod_ip);
}
ctx->user_ip4 = localhost;
} else {
// 旧版本兼容
ctx->user_ip4 = bpf_htonl(0x7f800000 | (outip++));
if (outip >> 20) {
outip = 1;
}
}
ctx->user_port = bpf_htons(OUT_REDIRECT_PORT);
} else {
// outbound 流量
// from envoy to others

}

return 1;
}

做完这些的时候,我们已经完成了第一部分:修改连接发起时的目的地址,让流量能够发送到新的接口。

SetOps/GetOps 阶段
  1. 在请求建立成功的时候,会将这个对象保存起来,这里刚好用到我们上面的 cookie 中储存的值。
sockops_ipv4github
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
__section("sockops") int mb_sockops(struct bpf_sock_ops *skops)
{
switch (skops->op) {
case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB:
case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB:
switch (skops->family) {
#if ENABLE_IPV4
case 2:
// AF_INET, we don't include socket.h, because it may
// cause an import error.
return sockops_ipv4(skops);
#endif
}
return 0;
}
return 0;
}

static inline int sockops_ipv4(struct bpf_sock_ops *skops)
{
__u64 cookie = bpf_get_socket_cookie_ops(skops);

struct pair p;
memset(&p, 0, sizeof(p));
set_ipv4(p.sip, skops->local_ip4);
p.sport = bpf_htons(skops->local_port);
set_ipv4(p.dip, skops->remote_ip4);
p.dport = skops->remote_port >> 16;

struct origin_info *dst =
bpf_map_lookup_elem(&cookie_original_dst, &cookie);
if (dst) {
struct origin_info dd = *dst;
// 在 Istio 中,Envoy 用的方式是使用当前 PodIP 加服务端口。经过上述入口流量处理后,我们会发现由于 PodIP 也存在于 local_pod_ips 中,那么这个请求会被转发到 PodIP + 15006 端口,这显然是不行的,会造成无限递归。
// 即在 Envoy 尝试建立连接时,还是会走重定向到 15006 端口,但是在 sockops 阶段会判断源 IP 和目的地址 IP 是否一致。如果一致,代表发送了错误的请求,那么我们会在 sockops 丢弃这个连接,并将当前的 ProcessID 和
// IP 地址信息写入 process_ip 这个 map,让 eBPF 支持进程与 IP 的对应关系。当下次发送请求时,直接从 process_ip 表检查目的地址是否与当前 IP 地址一致。
if (!(dd.flags & 1)) {
__u32 pid = dd.pid;
// process ip not detected
if (skops->local_ip4 == envoy_ip ||
skops->local_ip4 == skops->remote_ip4) {
// envoy to local
__u32 ip = skops->remote_ip4;
debugf("detected process %d's ip is %pI4", pid, &ip);
bpf_map_update_elem(&process_ip, &pid, &ip, BPF_ANY);
} else {
// envoy to envoy
__u32 ip = skops->local_ip4;
bpf_map_update_elem(&process_ip, &pid, &ip, BPF_ANY);
debugf("detected process %d's ip is %pI4", pid, &ip);
}
}
// get_sockopts can read pid and cookie,
// we should write a new map named pair_original_dst
bpf_map_update_elem(&pair_original_dst, &p, &dd, BPF_ANY);
bpf_sock_hash_update(skops, &sock_pair_map, &p, BPF_NOEXIST);
} else if (skops->local_port == OUT_REDIRECT_PORT ||
skops->local_port == IN_REDIRECT_PORT ||
skops->remote_ip4 == envoy_ip) {
bpf_sock_hash_update(skops, &sock_pair_map, &p, BPF_NOEXIST);
}
return 0;
}
  1. 读取这部分比较简单在 mb_get_sockopt 返回相对应的即可
    mb_get_sockoptgithub
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    __section("cgroup/getsockopt") int mb_get_sockopt(struct bpf_sockopt *ctx)
    {
    // envoy will call getsockopt with SO_ORIGINAL_DST, we should rewrite it to
    // return original dst info.
    if (ctx->optname != SO_ORIGINAL_DST) {
    return 1;
    }
    struct pair p;
    memset(&p, 0, sizeof(p));
    p.dport = bpf_htons(ctx->sk->src_port);
    p.sport = ctx->sk->dst_port;
    struct origin_info *origin;
    switch (ctx->sk->family) {
    #if ENABLE_IPV4
    case 2: // ipv4
    set_ipv4(p.dip, ctx->sk->src_ip4);
    set_ipv4(p.sip, ctx->sk->dst_ip4);
    origin = bpf_map_lookup_elem(&pair_original_dst, &p);
    if (origin) {
    // rewrite original_dst
    ctx->optlen = (__s32)sizeof(struct sockaddr_in);
    if ((void *)((struct sockaddr_in *)ctx->optval + 1) >
    ctx->optval_end) {
    printk("optname: %d: invalid getsockopt optval", ctx->optname);
    return 1;
    }
    ctx->retval = 0;
    struct sockaddr_in sa = {
    .sin_family = ctx->sk->family,
    .sin_addr.s_addr = get_ipv4(origin->ip),
    .sin_port = origin->port,
    };
    *(struct sockaddr_in *)ctx->optval = sa;
    }
    break;
    #endif
    return 1;
    }
Redirect 阶段

通过标准的 socketmap 就好了,修改地址即可完成重定向

sk_msggithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
__section("sk_msg") int mb_msg_redir(struct sk_msg_md *msg)
{
struct pair p;
memset(&p, 0, sizeof(p));
p.dport = bpf_htons(msg->local_port);
p.sport = msg->remote_port >> 16;

switch (msg->family) {
#if ENABLE_IPV4
case 2:
// ipv4
set_ipv4(p.dip, msg->local_ip4);
set_ipv4(p.sip, msg->remote_ip4);
break;
#endif
}

long ret = bpf_msg_redirect_hash(msg, &sock_pair_map, &p, BPF_F_INGRESS);
if (ret)
debugf("redirect %d bytes with eBPF successfully", msg->size);
return 1;
}

Inbound 流量

Outbound 类似,不做多阐述,可以参考官博

同节点加速

同集群加速的大部分逻辑也在之前 Connect 中,源码

  1. 因为在宿主机上有 CNI POD,来维护当前 Node POD IPS,并且维护在 local_pod_ips
  2. 根据 dst_ip 查询 local_pod_ips 是否包含,不包含直接 ret,走外部流量路径
  3. 如果有的话,经过一些检查,将转发端口修改 IN_REDIRECT_PORT (inbound端口)即可(没有这个端口的话,就不是 sidecar 的目标地址,使用用户的端口即可)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// from envoy to others
__u32 _dst_ip[4];
set_ipv4(_dst_ip, dst_ip);
struct pod_config *pod = bpf_map_lookup_elem(&local_pod_ips, _dst_ip);
if (!pod) {
// dst ip is not in this node, bypass
debugf("dest ip: %pI4 not in this node, bypass", &dst_ip);
return 1;
}

if (curr_pod_ip) {
if (curr_pod_ip != dst_ip) {
// call other pod, need redirect port.
int exclude = 0;
IS_EXCLUDE_PORT(pod->exclude_in_ports, ctx->user_port,
&exclude);
if (exclude) {
debugf("ignored dest port by exclude_in_ports, ip: %pI4, "
"port: %d",
&dst_ip, bpf_htons(ctx->user_port));
return 1;
}
int include = 0;
IS_INCLUDE_PORT(pod->include_in_ports, ctx->user_port,
&include);
if (!include) {
debugf("ignored dest port by include_in_ports, ip: %pI4, "
"port: %d",
&dst_ip, bpf_htons(ctx->user_port));
return 1;
}
ctx->user_port = bpf_htons(IN_REDIRECT_PORT);
}
origin.flags |= 1;
} else {
// 兼容老模式
}
__u64 cookie = bpf_get_socket_cookie_addr(ctx);
debugf("call from sidecar container: cookie: %d, ip: %pI4, port: %d",
cookie, &dst_ip, bpf_htons(ctx->user_port));
if (bpf_map_update_elem(&cookie_original_dst, &cookie, &origin,
BPF_NOEXIST)) {
printk("update cookie origin failed");
return 0;
}

最终效果

IPtables 模型

eBPF 模式(跨集群)

eBPF 模式(同集群)

Next

基于 eBPF 的扩展性,社区也在积极的探索新的功能,比如 TC (流量控制)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
err = netns.Do(func(_ ns.NetNS) error {
// listen on 39807
if err := s.buildListener(netns.Path()); err != nil {
return err
}
// attach tc to the device
if len(args.IfName) != 0 {
return s.attachTC(netns.Path(), args.IfName)
}
// interface not specified, should not happen?
ifaces, _ := net.Interfaces()
for _, iface := range ifaces {
if (iface.Flags&net.FlagLoopback) == 0 && (iface.Flags&net.FlagUp) != 0 {
return s.attachTC(netns.Path(), iface.Name)
}
}
return fmt.Errorf("device not found for %s", args.Netns)
})

参考