在 服务网格加速器 Merbridge 正式进入 CNCF 沙箱 中提及使用 Cillum 来提速 Istio,我们来瞧瞧这个原理是如何的。
Istio iptable 流量路径
在传统的 Istio 流量路径中,outbound 流量由 APP 出来被 iptables 劫持,然后通过内核转发给 Envoy,Envoy 再出去。Inbound 类似,因此实际上数据包在内核中倒腾了好几次
优化手段
既然是 ebpf 的手段,那肯定是 [译] 利用 ebpf sockmap/redirection 提升 socket 性能(2020)
目标是绕过内核的 TCP/IP
协议栈
How Merbridge Works
iptables
本身使用 DNAT
功能做流量转发
这里主要有两个要点:
修改连接发起时的目的地址,让流量能够发送到新的接口;
让 Envoy 能识别原始的目的地址,以能够识别流量。
想要用 eBPF
模拟 iptables
的能力,就需要使用 eBPF
实现类似 iptables
DNAT
的能力。
使用 eBPF 的 connect 程序修改 user_ip 和 user_port 实现。
需要用到 ORIGINAL_DST 的概念,这在 Linux 内核中是 netfilter 模块专属的。
其原理为:应用程序在收到连接之后调用 get_sockopts 函数,获取 ORIGINAL_DST。如果经过了 iptables 的 DNAT,那么 iptables 就会给当前的 socket 设置 ORIGINAL_DST 这个值,并把原有的 IP + 端口写入这个值,应用程序就可以根据连接拿到原有的目的地址。
PS: 值得注意的 eBPF 运行在内核态,因此不能想象其运行在 cgroup 的限制的虚拟空间中
Outbound 流量
早期版本
在早期的版本中,采用了一个比较巧妙的方式来做
在应用向外发起连接时,connect 程序会将目标地址修改为 127.x.y.z:15001,并用 cookie_original_dst 保存原始目的地址。
在 sockops 程序中,将当前 sock 和四元组保存在 sock_pair_map 中。同时,将四元组信息和对应的原始目的地址写入 pair_original_dst 中 (之所以不用 cookie,是因为 get_sockopt 函数无法获取当前 cookie)。
Envoy 收到连接之后会调用 getsockopt 获取当前连接的目的地址,get_sockopt 函数会根据四元组信息从 pair_original_dst 取出原始目的地址并返回,由此完全建立连接。
在发送数据阶段,redir 程序会根据四元组信息,从 sock_pair_map 中读取 sock,然后通过 bpf_msg_redirect_hash 进行直接转发,加速请求。
其中,之所以在 connect 时,修改目的地址为 127.x.y.z 而不是 127.0.0.1,是因为在不同的 Pod 中,可能产生冲突的四元组,使用此方式即可巧妙地避开冲突 (每个 Pod 间的目的 IP 不同,不会出现冲突的情况)。
source code
1 2 3 4 5 6 7 8 9 __u64 uid = bpf_get_current_uid_gid() & 0xffffffff ; __u32 dst_ip = ctx->user_ip4; ctx->user_ip4 = bpf_htonl(0x7f800000 | (outip++)); if (outip >> 20 ) { outip = 1 ; } ctx->user_port = bpf_htons(OUT_REDIRECT_PORT);
当前版本 (>0.7.2)
当前版本采用了 CNI 模式,较为复杂,但是会更加全面。
大部分 CNI 就是在节点上运行一个 daemonsets
的 POD
基于 Network NS
做一些工作,这里也一样,当我们安装完成 merbridge
,我们就能看到对应的节点 POD,
1 2 3 4 5 6 7 $ kubectl get pod -n istio-system NAME READY STATUS RESTARTS AGE istio-ingressgateway-676d4bf8b7-24257 1/1 Running 0 57d istiod-5fb4fb7f6b-5pzbs 1/1 Running 0 57d merbridge-c4jj9 1/1 Running 0 14h merbridge-jldmx 1/1 Running 0 14h merbridge-x65x6 1/1 Running 0 14h
当然还有 CLI 命令,代码在这里
启动逻辑在 RunLocalPodController 这里不做展开,
这里主要的工作就是维护当前 NODE
节点上的 POD IP
列表。
下面就一个新增 POD 的流程来看看。
POD 初始化
节点上的 membrige 会更新本Node节点的 IP 清单
新建 POD github 1 2 3 4 5 6 7 8 9 10 11 12 13 func addFunc (obj interface {}) { pod, ok := obj.(*v1.Pod) log.Debugf("got pod updated %s/%s" , pod.Namespace, pod.Name) _ip, _ := linux.IP2Linux(pod.Status.PodIP) log.Infof("update local_pod_ips with ip: %s" , pod.Status.PodIP) err := ebpfs.GetLocalIPMap().Update(_ip, &p, ebpf.UpdateAny) if err != nil { log.Errorf("update local_pod_ips %s error: %v" , pod.Status.PodIP, err) } }
通过 bpftool
工具很方便可以查看到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 $ bpftool map show 29: lru_hash name cookie_original flags 0x0 key 8B value 24B max_entries 65535 memlock 2097152B 30: hash name local_pod_ips flags 0x0 key 16B value 484B max_entries 1024 memlock 516096B 31: lru_hash name process_ip flags 0x0 key 4B value 4B max_entries 1024 memlock 8192B 32: lru_hash name cgroup_info_map flags 0x0 key 8B value 32B max_entries 1024 memlock 40960B 33: hash name mark_pod_ips_ma flags 0x0 key 4B value 16B max_entries 65535 memlock 1572864B 37: array name mb_conne.data flags 0x0 key 4B value 4B max_entries 1 memlock 4096B btf_id 135 38: lru_hash name pair_original_d flags 0x0 key 36B value 24B max_entries 65535 memlock 4194304B 39: sockhash name sock_pair_map flags 0x0 key 36B value 4B max_entries 65535 memlock 2621440B 70: hash name calico_failsafe flags 0x1 key 4B value 1B max_entries 65535 memlock 524288B
CNI 会将当前 POD IP 植入命令空间
CNI-CMDADD github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (s *server) listenConfig(addr net.Addr, netns string ) net.ListenConfig { return net.ListenConfig{ Control: func (network, address string , conn syscall.RawConn) error { var operr error if err := conn.Control(func (fd uintptr ) { m, err := ebpf.LoadPinnedMap(path.Join(s.bpfMountPath, "mark_pod_ips_map" ), &ebpf.LoadPinOptions{}) if err != nil { operr = err return } var ip unsafe.Pointer switch v := addr.(type ) { case *net.IPNet: ip, err = linux.IP2Linux(v.IP.String()) case *net.IPAddr: ip, err = linux.IP2Linux(v.String()) } if err != nil { operr = err return } key := getMarkKeyOfNetns(netns) operr = m.Update(key, ip, ebpf.UpdateAny) if operr != nil { return } operr = syscall.SetsockoptInt(int (fd), unix.SOL_SOCKET, ns.SoMark, int (key)) }); err != nil { return err } return operr }, } }
Outbound 处理
Connect 阶段
在应用向外发起连接时,connect 程序会先去读取下 cgroup_info,在 CNI
模式中,会获取当前的 POD IP
,源码地址
通过 cookie 保存 cookie_original_dst 信息,这个在后续 Envoy 中的 get_sockopt 函数需要使用到。
然后将 bpf_bind 把 ctx 和这个 cgroup 内的 socket 绑定,后续可以获取 ctx 在后续的执行
tcp4connect github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 static inline int tcp_connect4 (struct bpf_sock_addr *ctx) { struct cgroup_info cg_info ; if (!get_current_cgroup_info(ctx, &cg_info)) { return 1 ; } __u32 curr_pod_ip; __u32 _curr_pod_ip[4 ]; set_ipv6(_curr_pod_ip, cg_info.cgroup_ip); curr_pod_ip = get_ipv4(_curr_pod_ip); __u64 uid = bpf_get_current_uid_gid() & 0xffffffff ; __u32 dst_ip = ctx->user_ip4; if (uid != SIDECAR_USER_ID) { if ((dst_ip & 0xff ) == 0x7f ) { return 1 ; } __u64 cookie = bpf_get_socket_cookie_addr(ctx); debugf("call from user container: cookie: %d, ip: %pI4, port: %d" , cookie, &dst_ip, bpf_htons(ctx->user_port)); struct origin_info origin ; memset (&origin, 0 , sizeof (origin)); set_ipv4(origin.ip, dst_ip); origin.port = ctx->user_port; origin.flags = 1 ; if (bpf_map_update_elem(&cookie_original_dst, &cookie, &origin, BPF_ANY)) { printk("write cookie_original_dst failed" ); return 0 ; } if (curr_pod_ip) { struct sockaddr_in addr = { .sin_addr = { .s_addr = curr_pod_ip, }, .sin_port = 0 , .sin_family = 2 , }; if (bpf_bind(ctx, &addr, sizeof (struct sockaddr_in))) { debugf("bind %pI4 error" , &curr_pod_ip); } ctx->user_ip4 = localhost; } else { ctx->user_ip4 = bpf_htonl(0x7f800000 | (outip++)); if (outip >> 20 ) { outip = 1 ; } } ctx->user_port = bpf_htons(OUT_REDIRECT_PORT); } else { } return 1 ; }
做完这些的时候,我们已经完成了第一部分:修改连接发起时的目的地址,让流量能够发送到新的接口。
SetOps/GetOps 阶段
在请求建立成功的时候,会将这个对象保存起来,这里刚好用到我们上面的 cookie
中储存的值。
sockops_ipv4 github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 __section("sockops" ) int mb_sockops (struct bpf_sock_ops *skops) { switch (skops->op) { case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: switch (skops->family) { #if ENABLE_IPV4 case 2 : return sockops_ipv4(skops); #endif } return 0 ; } return 0 ; } static inline int sockops_ipv4 (struct bpf_sock_ops *skops) { __u64 cookie = bpf_get_socket_cookie_ops(skops); struct pair p ; memset (&p, 0 , sizeof (p)); set_ipv4(p.sip, skops->local_ip4); p.sport = bpf_htons(skops->local_port); set_ipv4(p.dip, skops->remote_ip4); p.dport = skops->remote_port >> 16 ; struct origin_info *dst = bpf_map_lookup_elem(&cookie_original_dst, &cookie); if (dst) { struct origin_info dd = *dst; if (!(dd.flags & 1 )) { __u32 pid = dd.pid; if (skops->local_ip4 == envoy_ip || skops->local_ip4 == skops->remote_ip4) { __u32 ip = skops->remote_ip4; debugf("detected process %d's ip is %pI4" , pid, &ip); bpf_map_update_elem(&process_ip, &pid, &ip, BPF_ANY); } else { __u32 ip = skops->local_ip4; bpf_map_update_elem(&process_ip, &pid, &ip, BPF_ANY); debugf("detected process %d's ip is %pI4" , pid, &ip); } } bpf_map_update_elem(&pair_original_dst, &p, &dd, BPF_ANY); bpf_sock_hash_update(skops, &sock_pair_map, &p, BPF_NOEXIST); } else if (skops->local_port == OUT_REDIRECT_PORT || skops->local_port == IN_REDIRECT_PORT || skops->remote_ip4 == envoy_ip) { bpf_sock_hash_update(skops, &sock_pair_map, &p, BPF_NOEXIST); } return 0 ; }
读取这部分比较简单在 mb_get_sockopt
返回相对应的即可
mb_get_sockopt github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 __section("cgroup/getsockopt" ) int mb_get_sockopt (struct bpf_sockopt *ctx) { if (ctx->optname != SO_ORIGINAL_DST) { return 1 ; } struct pair p ; memset (&p, 0 , sizeof (p)); p.dport = bpf_htons(ctx->sk->src_port); p.sport = ctx->sk->dst_port; struct origin_info *origin ; switch (ctx->sk->family) { #if ENABLE_IPV4 case 2 : set_ipv4(p.dip, ctx->sk->src_ip4); set_ipv4(p.sip, ctx->sk->dst_ip4); origin = bpf_map_lookup_elem(&pair_original_dst, &p); if (origin) { ctx->optlen = (__s32)sizeof (struct sockaddr_in); if ((void *)((struct sockaddr_in *)ctx->optval + 1 ) > ctx->optval_end) { printk("optname: %d: invalid getsockopt optval" , ctx->optname); return 1 ; } ctx->retval = 0 ; struct sockaddr_in sa = { .sin_family = ctx->sk->family, .sin_addr.s_addr = get_ipv4(origin->ip), .sin_port = origin->port, }; *(struct sockaddr_in *)ctx->optval = sa; } break ; #endif return 1 ; }
Redirect 阶段
通过标准的 socketmap 就好了,修改地址即可完成重定向
sk_msg github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 __section("sk_msg" ) int mb_msg_redir (struct sk_msg_md *msg) { struct pair p ; memset (&p, 0 , sizeof (p)); p.dport = bpf_htons(msg->local_port); p.sport = msg->remote_port >> 16 ; switch (msg->family) { #if ENABLE_IPV4 case 2 : set_ipv4(p.dip, msg->local_ip4); set_ipv4(p.sip, msg->remote_ip4); break ; #endif } long ret = bpf_msg_redirect_hash(msg, &sock_pair_map, &p, BPF_F_INGRESS); if (ret) debugf("redirect %d bytes with eBPF successfully" , msg->size); return 1 ; }
Inbound 流量
和 Outbound
类似,不做多阐述,可以参考官博
同节点加速
同集群加速的大部分逻辑也在之前 Connect
中,源码
因为在宿主机上有 CNI POD,来维护当前 Node POD IPS,并且维护在 local_pod_ips
中
根据 dst_ip
查询 local_pod_ips
是否包含,不包含直接 ret,走外部流量路径
如果有的话,经过一些检查,将转发端口修改 IN_REDIRECT_PORT
(inbound端口)即可(没有这个端口的话,就不是 sidecar 的目标地址,使用用户的端口即可)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 __u32 _dst_ip[4 ]; set_ipv4(_dst_ip, dst_ip); struct pod_config *pod = bpf_map_lookup_elem(&local_pod_ips, _dst_ip);if (!pod) { debugf("dest ip: %pI4 not in this node, bypass" , &dst_ip); return 1 ; } if (curr_pod_ip) { if (curr_pod_ip != dst_ip) { int exclude = 0 ; IS_EXCLUDE_PORT(pod->exclude_in_ports, ctx->user_port, &exclude); if (exclude) { debugf("ignored dest port by exclude_in_ports, ip: %pI4, " "port: %d" , &dst_ip, bpf_htons(ctx->user_port)); return 1 ; } int include = 0 ; IS_INCLUDE_PORT(pod->include_in_ports, ctx->user_port, &include); if (!include) { debugf("ignored dest port by include_in_ports, ip: %pI4, " "port: %d" , &dst_ip, bpf_htons(ctx->user_port)); return 1 ; } ctx->user_port = bpf_htons(IN_REDIRECT_PORT); } origin.flags |= 1 ; } else { } __u64 cookie = bpf_get_socket_cookie_addr(ctx); debugf("call from sidecar container: cookie: %d, ip: %pI4, port: %d" , cookie, &dst_ip, bpf_htons(ctx->user_port)); if (bpf_map_update_elem(&cookie_original_dst, &cookie, &origin, BPF_NOEXIST)) { printk("update cookie origin failed" ); return 0 ; }
最终效果
IPtables 模型
eBPF 模式(跨集群)
eBPF 模式(同集群)
Next
基于 eBPF 的扩展性,社区也在积极的探索新的功能,比如 TC (流量控制)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 err = netns.Do(func (_ ns.NetNS) error { if err := s.buildListener(netns.Path()); err != nil { return err } if len (args.IfName) != 0 { return s.attachTC(netns.Path(), args.IfName) } ifaces, _ := net.Interfaces() for _, iface := range ifaces { if (iface.Flags&net.FlagLoopback) == 0 && (iface.Flags&net.FlagUp) != 0 { return s.attachTC(netns.Path(), iface.Name) } } return fmt.Errorf("device not found for %s" , args.Netns) })
参考