Ambient Mesh Works : 2024 Version

image

Ambient Mesh Works 中我们分析了最初的版本,当前版本又做了一些更新,我们今天就来看看这些更新。

CNI & Ztunnel

image

相较于之前的版本中,使用了标记路由的方式进行流量透传
Pod -> Host -> Ztunnel 这样的路径,这样的路径对当前的其他的 CNI 比如 flannel 之类的冲击都比较的大,因此在当前的版本中,进行了较大的修改

这里采用了一个的新的设计模式

Pod -> Ztunnel 减少了到 Host 绕一圈的情况。

这里实现了标准的 CNI 的接口,不熟悉的同学可以参考 Kubernetes Network Plugins

所以这里需要实现 2 个东西

  • CNI-CLI: 给 kube 来调用,istio 中的名称为 istio-cni
  • CNI-Daemonset-POD: 处理逻辑,istio 中的名称为 install-cni
    那我们就按顺序来看一下

CNI-CLI

程序的入口如下

runPlugingithub
1
2
3
4
5
func runPlugin() error {
err := skel.PluginMainWithError(plugin.CmdAdd, plugin.CmdCheck, plugin.CmdDelete, version.All,
fmt.Sprintf("CNI plugin istio-cni %v", istioversion.Info.Version))
return nil
}

那么主要的逻辑显然就在 Add Delete 这些具体的逻辑中了。我们用 Add 逻辑看一下,代码在 plugin.go 这里进行拆解分析。

AmbientEnabledgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if conf.AmbientEnabled {
log.Debugf("istio-cni ambient cmdAdd podName: %s - checking if ambient enabled", podName)
// 判断这个 Pod 是不是接入了 ambient 模式
// 主要通过 Label 中的 DataplaneMode 标签,和是否已经接入 Sidecar 了
podIsAmbient, err := isAmbientPod(kClient, podName, podNamespace)
if err != nil {
log.Errorf("istio-cni cmdAdd failed to check ambient: %s", err)
}
// 获取 CNI Chian 前一个执行的结果,这里主要就是下面获取 IP
prevResult := conf.PrevResult.(*cniv1.Result)

// Only send event if this pod "would be" an ambient-watched pod - otherwise skip
// 当判断 POD 是 ambient 模式的时候,会通过 UDS (HTTP 请求) 通知节点上的 Istio CNI POD
if podIsAmbient {
cniClient := newCNIClient(conf.CNIEventAddress, constants.CNIAddEventPath)
if err = PushCNIEvent(cniClient, args, prevResult.IPs, podName, podNamespace); err != nil {
log.Errorf("istio-cni cmdAdd failed to signal node Istio CNI agent: %s", err)
return err
}
return nil
}
log.Debugf("istio-cni ambient cmdAdd podName: %s - not ambient enabled, ignoring", podName)
}
// End ambient plugin logic 这里就结束了

(*) 原 CNI 模式模式

这里就和我们 ambient 没什么关系了,主要是在比较早期的版本 CNI plugin 的逻辑,不基于 initContainer 是通过这里完成 iptables 的劫持逻辑的。

紧接着的逻辑,有一长串来判断是不是需要处理重定向,这里包含了

  • 如果 POD 中包含了 ISTIOINIT 的初始化容器
  • 如果 POD 中包含了 DISABLE_ENVOY 的环境变量
  • 如果 POD 中包含了 ISTIOPROXY 组件

等等逻辑就不去重定向请求,这部分就略过了,核心逻辑在下面

NewRedirectgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
log.Debugf("Setting up redirect")

redirect, err := NewRedirect(pi)
if err != nil {
log.Errorf("redirect failed due to bad params: %v", err)
return err
}

if err := rulesMgr.Program(podName, args.Netns, redirect); err != nil {
return err
}

return nil

这里开始重定向 POD 的所有请求,这段逻辑的主体 redirect 就是之前 init 容器 iptables 规则修改,因此这里也没什么特别神奇的处理。

最终就会执行如下命令

Programgithub
1
2
3
4
5
6
7
8
func (ipt *iptables) Program(podName, netns string, rdrct *Redirect) error {
// 省略一些初始化
return netNs.Do(func(_ ns.NetNS) error {
log.Infof("============= Start iptables configuration for %v =============", podName)
defer log.Infof("============= End iptables configuration for %v =============", podName)
return cmd.ProgramIptables(cfg)
})
}

CNI-Install

另外一个组件,常驻在节点的 POD 这里使用的是另外一个 CMDistio 中称之为 install-cni

这个程序的入口显然就是通过 UDS 获取到 PodAdd 事件然后进行处理。我们就从这里开始

Init

NewServergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 启动一个 UDS Listener Server 来监听各种事件
log.Info("Starting ambient node agent with inpod redirect mode")
ambientAgent, err := nodeagent.NewServer(ctx, watchServerReady, cfg.InstallConfig.CNIEventAddress,
nodeagent.AmbientArgs{
SystemNamespace: nodeagent.PodNamespace,
Revision: nodeagent.Revision,
ServerSocket: cfg.InstallConfig.ZtunnelUDSAddress,
DNSCapture: cfg.InstallConfig.AmbientDNSCapture,
})
if err != nil {
return fmt.Errorf("failed to create ambient nodeagent service: %v", err)
}

ambientAgent.Start()
defer ambientAgent.Stop()

log.Info("Ambient node agent started, starting installer...")

但是 NewServer 中包含了大量的逻辑,这里就简单看看

NewServergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func NewServer(ctx context.Context, ready *atomic.Value, pluginSocket string, args AmbientArgs) (*Server, error) {
client, err := buildKubeClient(args.KubeConfig)
if err != nil {
return nil, fmt.Errorf("error initializing kube client: %w", err)
}

// 创建一个 ipset,方面后面添加各种 IP
log.Debug("creating ipsets in the node netns")
set, err := createHostsideProbeIpset()
if err != nil {
return nil, fmt.Errorf("error initializing hostside probe ipset: %w", err)
}

// 创建了一个 pod ns 的cache map,给 ztunnel 后续使用
podNsMap := newPodNetnsCache(openNetnsInRoot(pconstants.HostMountsPath))
ztunnelServer, err := newZtunnelServer(args.ServerSocket, podNsMap)
if err != nil {
return nil, fmt.Errorf("error initializing the ztunnel server: %w", err)
}

// 忽略一些本地iptables 创建
// 这里就是构建完整的 NetServer
podNetns := NewPodNetnsProcFinder(os.DirFS(filepath.Join(pconstants.HostMountsPath, "proc")))
netServer := newNetServer(ztunnelServer, podNsMap, iptablesConfigurator, podNetns, set)

// Set some defaults
s := &Server{
ctx: ctx,
kubeClient: client,
isReady: ready,
dataplane: &meshDataplane{
kubeClient: client.Kube(),
netServer: netServer,
},
}
s.NotReady()

// 设定 Hanlder 函数
s.handlers = setupHandlers(s.ctx, s.kubeClient, s.dataplane, args.SystemNamespace)
err = cniServer.Start()
return s, nil
}

handleAddEvent

看完了简要的初始化工作,我们来看函数的处理主体

handleAddEventgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *CniPluginServer) handleAddEvent(w http.ResponseWriter, req *http.Request) {
// 一些前置校验略,这里将 data 序列化之后传入下面的 Reconcile
msg, err := processAddEvent(data)
if err != nil {
log.Errorf("Failed to process CNI event payload: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if err := s.ReconcileCNIAddEvent(req.Context(), msg); err != nil {
log.Errorf("Failed to handle add event: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
ReconcileCNIAddEventgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *CniPluginServer) ReconcileCNIAddEvent(ctx context.Context, addCmd CNIPluginAddEvent) error {
log := log.WithLabels("cni-event", addCmd)

log.Debugf("netns: %s", addCmd.Netns)

maxStaleRetries := 10
msInterval := 10
retries := 0
var ambientPod *corev1.Pod
var err error

log.Debugf("Checking pod: %s in ns: %s is enabled for ambient", addCmd.PodName, addCmd.PodNamespace)

// 这里用了 client-go 的 cache client 可能拿不到,就多试了几次(10 次)
for ambientPod, err = s.handlers.GetPodIfAmbient(addCmd.PodName, addCmd.PodNamespace); (ambientPod == nil) && (retries < maxStaleRetries); retries++ {
if err != nil {
return err
}
log.Warnf("got an event for pod %s in namespace %s not found in current pod cache, retry %d of %d",
addCmd.PodName, addCmd.PodNamespace, retries, maxStaleRetries)
time.Sleep(time.Duration(msInterval) * time.Millisecond)
}

// 如果找不到这个 pod 就直接错误返回了
if ambientPod == nil {
return fmt.Errorf("got event for pod %s in namespace %s but could not find in pod cache after retries", addCmd.PodName, addCmd.PodNamespace)
}
log.Debugf("Pod: %s in ns: %s is enabled for ambient, adding to mesh.", addCmd.PodName, addCmd.PodNamespace)

// 直到这里才是核心的逻辑处理
var podIps []netip.Addr
// 将 IP 都读出来,
for _, configuredPodIPs := range addCmd.IPs {
ip, _ := netip.AddrFromSlice(configuredPodIPs.Address.IP)
podIps = append(podIps, ip)
}
// 这里将这些 IP 加到 Mesh 中来,这里就最为复杂的部分了
err = s.dataplane.AddPodToMesh(ctx, ambientPod, podIps, addCmd.Netns)
if err != nil {
return err
}

return nil
}

AddPodToMesh

AddPodToMesh 逻辑中构成了主要的核心流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// AddPodToMesh 增加一个 Pod 到 网格中
// 1. 获取 POD 的 NS
// 2. 把 POD IP 添加到 Host 中的 ipset 中,给 node 进行 probe check 使用
// 3. 在 POD 中执行 iptables 规则生成
// 4. 通知 Ztunnel 常见一个 Proxy 实例来代码 POD 请求
func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []netip.Addr, netNs string) error {
log.Infof("in pod mode - adding pod %s/%s to ztunnel ", pod.Namespace, pod.Name)
s.currentPodSnapshot.Ensure(string(pod.UID))

// 获取这个 POD 的 网络命名空间
openNetns, err := s.getOrOpenNetns(&pod.ObjectMeta, netNs)
if err != nil {
return err
}

// 将 Pod ip 添加到 ipset 里面,主要为了放行健康检查
err = addPodToHostNSIpset(pod, podIPs, &s.hostsideProbeIPSet)
if err != nil {
log.Errorf("failed to add pod to ipset: %s/%s %v", pod.Namespace, pod.Name, err)
return err
}

log.Debug("calling CreateInpodRules")
// 调用逻辑进行 iptables 规则生成
if err := s.netnsRunner(openNetns, func() error {
return s.iptablesConfigurator.CreateInpodRules(&HostProbeSNATIP)
}); err != nil {
log.Errorf("failed to update POD inpod: %s/%s %v", pod.Namespace, pod.Name, err)
return err
}

log.Debug("notifying subscribed node proxies")
// 为其创建 Proxy 实例
if err := s.sendPodToZtunnelAndWaitForAck(ctx, &pod.ObjectMeta, openNetns); err != nil {
// we must return PartialAdd error here. the pod was injected with iptables rules,
// so it should be annotated, so if it is removed from the mesh, the rules will be removed.
// alternatively, we may not return an error at all, but we want this to fail on tests.
// 特化的 Error,参考上面的注释
return NewErrPartialAdd(err)
}
return nil
}

到这里,针对 POD 的逻辑我们已经做完了,下面就是 Ztunel 的逻辑了,我们简要的总结一下

PodAdded

经过简单的跳转就可以抵达这里。PodAdded 的主要工作是把当前需要处理的 Pod 信息发送给 ztunnel

PodAddedgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 这里通过 GRPC 请求,把 uid 和 netns 发给 Ztunnel 就结束了,这部分看起来还是很简单,就不做详细介绍了
func (z *ztunnelServer) PodAdded(ctx context.Context, uid string, netns Netns) error {
latestConn := z.conns.LatestConn()
if latestConn == nil {
return fmt.Errorf("no ztunnel connection")
}
r := &zdsapi.WorkloadRequest{
Payload: &zdsapi.WorkloadRequest_Add{
Add: &zdsapi.AddWorkload{
Uid: uid,
},
},
}
log.Debugf("About to send added pod: %s to ztunnel: %v", uid, r)
data, err := proto.Marshal(r)
if err != nil {
return err
}

fd := int(netns.Fd())
resp, err := latestConn.send(ctx, data, &fd)
if err != nil {
return err
}

// 等待 ACK
if resp.GetAck().GetError() != "" {
log.Errorf("add-workload: got ack error: %s", resp.GetAck().GetError())
return fmt.Errorf("got ack error: %s", resp.GetAck().GetError())
}
return nil
}

处理完这些 CNI 的工作就完成了。

Ztunnel

install-cni 的逻辑一样,这里我们应该也是从 zdsapi.WorkloadRequest 开始了解起来,这部分的逻辑在 process 就是一个标准的事件循环,读取 Request 然后处理 Response

processgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
pub async fn process(&mut self, mut processor: WorkloadStreamProcessor) -> Result<(), Error> {
processor
.send_hello()
.await
.map_err(|_| Error::ProtocolError)?;

loop {
// 读取 Message,这里就是读取刚刚的 zdsapi.WorkloadRequest
// read_message_and_retry_proxies 的逻辑也比较简单就不展开了,
// 异常都先忽略
let msg = match self.read_message_and_retry_proxies(&mut processor).await {
Ok(Some(msg)) => Ok(msg),
Ok(None) => {
return Ok(());
}
Err(e) => {}
}?;

debug!("received message: {:?}", msg);

// 处理 Msg 并且发送 ACK 消息
match self.state.process_msg(msg).await {
Ok(()) => {
self.check_ready();
processor
.send_ack()
.await
.map_err(|e| Error::SendAckError(e.to_string()))?;
}
Err(Error::ProxyError(e)) => {}
Err(e) => {}
};
}
}

实际上的逻辑通过一些简单的跳转,我们就可以找到

add_workload_innergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async fn add_workload_inner(
&mut self,
workload_uid: &WorkloadUid,
workload_info: &Option<WorkloadInfo>,
netns: InpodNetns,
) -> Result<(), crate::proxy::Error> {
debug!(
workload=?workload_uid,
workload_info=?workload_info,
inode=?netns.workload_inode(),
"starting proxy",
);

// 创建一个 drain channel,当主程序退出的时候,要把 proxy 也退出
let workload_netns_inode = netns.workload_inode();
let (drain_tx, drain_rx) = drain::channel();


// 为这个 POD 创建一个代理
// 这里都是一些基础的准备
let proxies = self
.proxy_gen
.new_proxies_from_factory(
Some(drain_rx),
workload_info.clone(),
Arc::from(self.inpod_config.socket_factory(netns)),
)
.await?;

let uid = workload_uid.clone();
self.admin_handler
.proxy_up(&uid, proxies.connection_manager);

let metrics = self.metrics.clone();
let admin_handler = self.admin_handler.clone();
metrics.proxies_started.get_or_create(&()).inc();
// 在另外一个协程中启动 proxy
if let Some(proxy) = proxies.proxy {
tokio::spawn(
async move {
proxy.run().await;
debug!("proxy for workload {:?} exited", uid);
metrics.proxies_stopped.get_or_create(&()).inc();
admin_handler.proxy_down(&uid);
}
.instrument(tracing::info_span!("proxy", uid=%workload_uid.clone().into_string())),
);
}
// 启动 DNS Proxy 代码
if let Some(proxy) = proxies.dns_proxy {
tokio::spawn(proxy.run().instrument(
tracing::info_span!("dns_proxy", uid=%workload_uid.clone().into_string()),
));
}

self.workload_states.insert(
workload_uid.clone(),
WorkloadState {
drain: drain_tx,
workload_netns_inode,
},
);

Ok(())
}

可以看出来 Proxy 的主体逻辑在 proxy.run() 中,但是在我们看 run 之前,我们先来阅读 new_proxies_from_factory 中的逻辑

new_proxies_from_factorygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
if self.config.proxy {
let cm = ConnectionManager::default();
let pi = crate::proxy::ProxyInputs::new(
self.config.clone(),
self.cert_manager.clone(),
cm.clone(),
self.state.clone(),
self.proxy_metrics.clone().unwrap(),
socket_factory.clone(),
proxy_workload_info,
);
result.connection_manager = Some(cm);
// 在这里构建了 proxy
result.proxy = Some(Proxy::from_inputs(pi, drain.clone()).await?);
}

pub(super) async fn from_inputs(mut pi: ProxyInputs, drain: Watch) -> Result<Self, Error> {
// We setup all the listeners first so we can capture any errors that should block startup
// 在创建 proxy 的时候就已经创建了所有的 listener,可以将失败前置
let inbound = Inbound::new(pi.clone(), drain.clone()).await?;
pi.hbone_port = inbound.address().port();

// 这里创建了一系列的 tcp 链接
let inbound_passthrough = InboundPassthrough::new(pi.clone(), drain.clone()).await?;
let outbound = Outbound::new(pi.clone(), drain.clone()).await?;
let socks5 = Socks5::new(pi.clone(), drain.clone()).await?;
let policy_watcher = PolicyWatcher::new(pi.state, drain, pi.connection_manager);

Ok(Proxy {
inbound,
inbound_passthrough,
outbound,
socks5,
policy_watcher,
})
}

我们当前关注我们可能使用最多的 inboundoutbound

Inbound::newgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub(super) async fn new(mut pi: ProxyInputs, drain: Watch) -> Result<Inbound, Error> {
// 这里可以看出来,这里的 `bind` 会返回一个 `TcpListener`,不过绑定的 `socket` 是从 `socket_factory` 来的
// 也就从 pod 的 ns 中获取的
let listener: TcpListener = pi
.socket_factory
.tcp_bind(pi.cfg.inbound_addr)
.map_err(|e| Error::Bind(pi.cfg.inbound_addr, e))?;
let transparent = super::maybe_set_transparent(&pi, &listener)?;
// Override with our explicitly configured setting
pi.cfg.enable_original_source = Some(transparent);
info!(
address=%listener.local_addr().expect("local_addr available"),
component="inbound",
transparent,
"listener established",
);
Ok(Inbound {
listener,
drain,
pi,
})
}

Outbound Run

接下来,我们分析其中一个场景 outbound 是如何流转流量的。
完整代码在 outbound.rs 非常的长,我们这里分段理解一下

最启示的部分在 outbound.rs

rungithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
loop {
// 等待一个链接的建立
// listener 是在 POD NS
let socket = self.listener.accept().await;
let start_outbound_instant = Instant::now();
// clone drain 为后面停止预留通知渠道
let outbound_drain = sub_drain.clone();
let outer_conn_drain = sub_drain.clone();
match socket {
// 这里说明链接建立成功了
Ok((stream, _remote)) => {
let mut oc = OutboundConnection {
pi: self.pi.clone(),
id: TraceParent::new(),
};
let span = info_span!("outbound", id=%oc.id);
// 将这个任务扔到 tokio 的 task 任务池中运行
tokio::spawn(
(async move {
debug!(dur=?start_outbound_instant.elapsed(), id=%oc.id, "outbound spawn START");
// Since this task is spawned, make sure we are guaranteed to terminate
tokio::select! {
// 如果 Drain 了就需要退出
_ = outbound_drain.signaled() => {
debug!("outbound drain signaled");
}
// 这里进行转发
res = oc.proxy(stream, outer_conn_drain.clone()) => {
match res {
Ok(_) => info!(dur=?start_outbound_instant.elapsed(), "complete"),
Err(e) => warn!(dur=?start_outbound_instant.elapsed(), err=%e, "failed")
};
}
}
debug!(dur=?start_outbound_instant.elapsed(), id=%oc.id, "outbound spawn DONE");
})
.instrument(span),
);
}
Err(e) => {
// 略处理错误
}
}

而下一步的 proxy 函数在

proxygithub
1
2
3
4
5
6
7
async fn proxy(&mut self, stream: TcpStream, outer_conn_drain: Watch) -> Result<(), Error> {
// 拿到来源地址
let peer = socket::to_canonical(stream.peer_addr().expect("must receive peer addr"));
// 拿到目标地址
let orig_dst_addr = socket::orig_dst_addr_or_default(&stream);
self.proxy_to(stream, peer, orig_dst_addr, false, Some(outer_conn_drain)).await
}

那下面就来到了最复杂的部分了,分解之,从outbound.rs 开始看起

Build Request

在理解这块代码之前需要理解下,当前系统的几个路径

image

  1. Pod -> Pod: 都在 ambient 中,只有 L4 的需求,就是 Pod –(outbound)–> Ztunnel —> Pod –(inbound)–> Ztunnel
  2. Pod -> Pod: 都在 ambient 中,有 L7 的需求,就是 Pod –(outbound)–> Ztunnel —-> Waypoint –> Pod –(inbound)–> Ztunnel
  3. Pod -> Pod(非ambient): Pod –(outbound)–> Ztunnel —> Pod

实际上并不存在 Ztunnel -> Ztunnel 的直接路径,这个和之前的版本是不一样的,因此如果只从宿主机的角度看请求,是无法感知到这个请求的存在的,只有 Ztunnel -> Pod 的路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// 构建目标的访问请求,这里还是比较复杂的
let req = self.build_request(remote_addr.ip(), orig_dst_addr).await?;

// build_request 的函数是在 https://github.com/istio/ztunnel/blob/4f17630b6922ee1b77110c98ae736647477c0356/src/proxy/outbound.rs#L457-L667
// 构建目标的访问请求
async fn build_request(
&self,
downstream: IpAddr,
target: SocketAddr,
) -> Result<Request, Error> {
// istio 中的 downstream 和 nginx 一致,就是来源
let downstream_network_addr = NetworkAddress {
network: self.pi.cfg.network.clone(),
address: downstream,
};
// 根据来源地址获得所对应的 workload
let source_workload = match self.pi.state.fetch_workload(&downstream_network_addr).await {
Some(wl) => wl,
None => return Err(Error::UnknownSource(downstream)),
};

// 这里排版蛮奇怪的
// 如果是一个访问 Service 的请求,看看是不是需要走 waypoint
// 需要的话就会这里直接短路了
let svc_addressed = if let Some(Address::Service(s)) = self
.pi
.state
// 首先根据目标地址获得访问地址,这里有两者类型 Service 和 Worload
.fetch_destination(&Destination::Address(NetworkAddress {
network: self.pi.cfg.network.clone(),
address: target.ip(),
}))
.await
{
// 如果是 Address::Service 类型,并且有 waypoint 信息
if let Some(wp) = s.waypoint.clone() {
let waypoint_vip = match wp.destination {
Destination::Address(a) => a.address,
Destination::Hostname(_) => {
return Err(proxy::Error::UnknownWaypoint(
"hostname lookup not supported yet".to_string(),
));
}
};
// 找到 waypoint 的 mtls 的 hbos 端口
let waypoint_vip = SocketAddr::new(waypoint_vip, wp.hbone_mtls_port);
// 这里找 waypoint 的 真实地址(上面那个 VIP 是 SVC 地址)
let waypoint_us = self
.pi
.state
.fetch_upstream(&self.pi.cfg.network, &source_workload, waypoint_vip)
.await
.ok_or(proxy::Error::UnknownWaypoint(
"unable to determine waypoint upstream".to_string(),
))?;

// 选择一个访问地址
let waypoint_workload = waypoint_us.workload;
let waypoint_ip = self
.pi
.state
.pick_workload_destination(
&waypoint_workload,
&source_workload,
self.pi.metrics.clone(),
)
.await?; // if we can't load balance just return the error

let waypoint_socket_address = SocketAddr::new(waypoint_ip, waypoint_us.port);

// 如果能找到 waypoint,下面的路径都不用处理了,这里直接返回 waypoint 去处理即可
return Ok(Request {
protocol: Protocol::HBONE,
direction: Direction::Outbound,
source: source_workload,
destination: target,
destination_workload: None, // this is to Service traffic with a wp... gateway will handle workload selection
destination_service: Some(ServiceDescription::from(&*s)),
expected_identity: Some(waypoint_workload.identity()),
gateway: waypoint_socket_address,
request_type: RequestType::ToServerWaypoint,
upstream_sans: waypoint_us.sans,
});
}
// 这里说明是一个访问 Service 的请求,但是没有 waypoint 信息
true
} else {
// 这里说明不是访问 Service 的请求
false
};

// 根据 来源和目标(这里肯定非 Service 的访问流量) 根据 IP 查找,找不到就是 Passthrouh 的路径
let us = match self
.pi
.state
.fetch_upstream(&source_workload.network, &source_workload, target)
.await
{
Some(us) => us,
None => {
// For case no upstream found, passthrough it
return Ok(Request {
protocol: Protocol::TCP,
source: source_workload,
destination: target,
destination_workload: None,
destination_service: None,
expected_identity: None,
gateway: target,
direction: Direction::Outbound,
request_type: RequestType::Passthrough,
upstream_sans: vec![],
});
}
};

let workload_ip = self
.pi
.state
.pick_workload_destination(&us.workload, &source_workload, self.pi.metrics.clone())
.await?;

let from_waypoint = proxy::check_from_waypoint(
self.pi.state.clone(),
&us.workload,
Some(&source_workload.identity()),
&downstream_network_addr.address,
)
.await;

// Don't traverse waypoint twice if the source is sandwich-outbound.
// 如果来源是 waypoint 的请求,不应该再走 waypoint 了
// Don't traverse waypoint if traffic was addressed to a service which did not have a waypoint
// 如果访问目标是一个服务,但是没有 waypoint,也不要走 waypoint
if !from_waypoint && !svc_addressed {
// 用目标 IP 找了下 waypoint 地址
match self
.pi
.state
.fetch_waypoint(&us.workload, &source_workload, workload_ip)
.await
{
Ok(None) => {} // 目标地址不是访问 svc,但是也没有 waypoint ,正常逻辑往下走就好
Ok(Some(waypoint_us)) => { // 如果目标地址有 waypoint 的存在,应该走 waypoint
let waypoint_workload = waypoint_us.workload;
let waypoint_ip = self
.pi
.state
.pick_workload_destination(
&waypoint_workload,
&source_workload,
self.pi.metrics.clone(),
)
.await?;
let wp_socket_addr = SocketAddr::new(waypoint_ip, waypoint_us.port);
return Ok(Request {
// Always use HBONE here
protocol: Protocol::HBONE,
source: source_workload,
// Use the original VIP, not translated
destination: target,
destination_workload: Some(us.workload),
destination_service: us.destination_service.clone(),
expected_identity: Some(waypoint_workload.identity()),
gateway: wp_socket_addr,
// Let the client remote know we are on the inbound path.
direction: Direction::Inbound,
request_type: RequestType::ToServerWaypoint,
upstream_sans: us.sans,
});
}
// 异常处理
Err(e) => return Err(Error::UnknownWaypoint(e.to_string())),
}
}

// 到这里,下面肯定是不走 waypoint 的路径
// 根据目标是否包含 hbone 端口进行端口切换
let gw_addr = match us.workload.protocol {
Protocol::HBONE => SocketAddr::from((workload_ip, self.pi.hbone_port)),
Protocol::TCP => SocketAddr::from((workload_ip, us.port)),
};

// 对目标和来源都是在在一个节点上的处理
// 未来可能优化,现在其实只是标记了,但是还是会走一段的
if !self.pi.cfg.inpod_enabled
&& !us.workload.node.is_empty()
&& self.pi.cfg.local_node.as_ref() == Some(&us.workload.node) // looks weird but in Rust borrows can be compared and will behave the same as owned (https://doc.rust-lang.org/std/primitive.reference.html)
&& us.workload.protocol == Protocol::HBONE
{
trace!(
workload_node = us.workload.node,
local_node = self.pi.cfg.local_node,
"select {:?}",
RequestType::DirectLocal
);
return Ok(Request {
protocol: Protocol::HBONE,
source: source_workload,
destination: SocketAddr::from((workload_ip, us.port)),
destination_workload: Some(us.workload.clone()),
destination_service: us.destination_service.clone(),
expected_identity: Some(us.workload.identity()),
gateway: SocketAddr::from((gw_addr.ip(), self.pi.hbone_port)),
direction: Direction::Outbound,
// Sending to a node on the same node (ourselves).
// In the future this could be optimized to avoid a full network traversal.
request_type: RequestType::DirectLocal,
upstream_sans: us.sans,
});
}
// 上面各种逻辑不匹配的情况,就直接走访问
Ok(Request {
protocol: us.workload.protocol,
source: source_workload,
destination: SocketAddr::from((workload_ip, us.port)),
destination_workload: Some(us.workload.clone()),
destination_service: us.destination_service.clone(),
expected_identity: Some(us.workload.identity()),
gateway: gw_addr,
direction: Direction::Outbound,
request_type: RequestType::Direct,
upstream_sans: us.sans,
})
}

代码比较细碎,整体上的逻辑就是

  1. 根据 目标IP 判断是不是一个 SVCClusterIP
  2. 如果是访问 SVC 的地址,先去找 waypoint 有没有,如果有就走 waypoint
  3. 然后根据 IP 来找对应的 workload,找不到说明不在系统内,那就是 Passthrough 的路径
  4. 如果访问的是 IP 的地址,并且来源不是 Waypoint 的可以在尝试再找一下这个 IP 是否有 waypoint 有就走那去,这里解决了之前 IP 无法匹配 VS 的问题
  5. 剩下来的逻辑里面就看看对端是不是有 ztunnel 的逻辑,走 HBONE 协议,不然就走默认的 TCP

Proxy

接下来的处理在

proxygithub
1
2
3
4
5
6
7
8
9
10
 let res = match req.protocol {
Protocol::HBONE => {
self.proxy_to_hbone(&mut source_stream, source_addr, outer_conn_drain, &req)
.await
}
Protocol::TCP => {
self.proxy_to_tcp(&mut source_stream, source_addr, outer_conn_drain, &req)
.await
}
};

对于 proxy_to_hbone 的逻辑就是符合协议的,我们抛开一些细节上的点,大致上的逻辑上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

// 建立连接的函数
let connect = async {
// 这里建立了一个 TLS 链接,前面忽略了一些证书的细节
let tls_stream = connector.connect(tcp_stream).await?;
let (request_sender, connection) = builder
.handshake(::hyper_util::rt::TokioIo::new(tls_stream))
.await
.map_err(Error::HttpHandshake)?;

// 返回 Sender
Ok(request_sender)
};

// 这里使用connect函数,构建了一个连接
let mut connection = self.pi.pool.connect(pool_key.clone(), connect).await?;

let mut f = http_types::proxies::Forwarded::new();
f.add_for(remote_addr.to_string());

// 构建一个 CONNECT 请求
let request = hyper::Request::builder()
.uri(&req.destination.to_string())
.method(hyper::Method::CONNECT)
.version(hyper::Version::HTTP_2)
.header(BAGGAGE_HEADER, baggage(req, self.pi.cfg.cluster_id.clone()))
.header(FORWARDED, f.value().expect("Forwarded value is infallible"))
.header(TRACEPARENT_HEADER, self.id.header())
.body(Empty::<Bytes>::new())
.expect("builder with known status code should not fail");

// 发送第一个建立的连接,等待响应,就是构成一个 HBONE 的连接
let response = connection.send_request(request).await?;
debug!("outbound - connection send END");

let code = response.status();
if code != 200 {
return Err(Error::HttpStatus(code));
}
let mut upgraded = hyper::upgrade::on(response).await?;

// 然后就是把流量来回 COPY
super::copy_hbone(&mut upgraded, stream)
.instrument(trace_span!("hbone client"))
.await