Rust Ztunnel 源码分析

推荐在阅读前,先阅读下Introducing Rust-Based Ztunnel for Istio Ambient Service Mesh

Goal

阅读一个开源项目,我们首先要知道它的主要解决的问题是什么,Ztuunel 是一个 L4 的代理服务器,主要解决了 mtls 的问题,正如名字所描述 zero trust tunnel,因此我们需要关注这个项目有两个部分

  • L4 的流量是如何区分和扭转的
  • mtls 是如何工作的

前置知识

构建基础

  • 因为用了 BoringSSL ,当前只支持 Linux
  • 在阅读代码之前记得运行 cargo run build.rs 生成 proto 相关的对象

代码架构

Ztunnel 大概分成了几个部分

  • Admin: 管理模块,和 Envoy Admin 类似
  • Stats: 指标模块
  • Workload Manager: 负载管理模块
  • Proxy: 代理模块【核心模块】
    • inbound: 入流量处理模式
    • outbound: 出流量处理模块
    • socks5: 代理模块

整体代码的入口在 build_with_cert 这里就可以看到整个项目的全貌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
pub async fn build_with_cert(
config: config::Config,
cert_manager: impl CertificateProvider,
) -> anyhow::Result<Bound> {
let mut registry = Registry::default();
let metrics = Arc::new(Metrics::from(&mut registry));
let certificate_manager: Box<dyn CertificateProvider> = Box::new(cert_manager);

let shutdown = signal::Shutdown::new();
// Setup a drain channel. drain_tx is used to trigger a drain, which will complete
// once all drain_rx handlers are dropped.
// Any component which wants time to gracefully exit should take in a drain_rx clone, await drain_rx.signaled(), then cleanup.
// Note: there is still a hard timeout if the draining takes too long
let (drain_tx, drain_rx) = drain::channel();

let ready = readiness::Ready::new();
let proxy_task = ready.register_task("proxy listeners");

let workload_manager = workload::WorkloadManager::new(
config.clone(),
metrics.clone(),
ready.register_task("workload manager"),
certificate_manager.clone(),
)
.await?;

let admin_server = admin::Service::new(
config.clone(),
workload_manager.workloads(),
shutdown.trigger(),
drain_rx.clone(),
)
.context("admin server starts")?;
let stats_server = stats::Service::new(
config.clone(),
registry,
shutdown.trigger(),
drain_rx.clone(),
)
.context("stats server starts")?;
let readiness_server =
readiness::Service::new(config.clone(), ready, shutdown.trigger(), drain_rx.clone())
.context("readiness server starts")?;
let readiness_address = readiness_server.address();
let admin_address = admin_server.address();
let stats_address = stats_server.address();

let proxy = proxy::Proxy::new(
config.clone(),
workload_manager.workloads(),
certificate_manager.clone(),
metrics.clone(),
drain_rx.clone(),
)
.await?;
drop(proxy_task);

// spawn all tasks that should run in the main thread
admin_server.spawn();
stats_server.spawn();
tokio::spawn(
async move {
if let Err(e) = workload_manager.run().await {
error!("workload manager: {}", e);
}
}
.in_current_span(),
);

let proxy_addresses = proxy.addresses();
let span = tracing::span::Span::current();
thread::spawn(move || {
let _span = span.enter();
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_worker_threads)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("ztunnel-proxy-{id}")
})
.enable_all()
.build()
.unwrap();
runtime.block_on(
async move {
readiness_server.spawn();
proxy.run().in_current_span().await;
}
.in_current_span(),
);
});

Ok(Bound {
drain_tx,
config,
shutdown,
readiness_address,
admin_address,
stats_address,
proxy_addresses,
})
}

Proxy 模块

我们先来看看 Proxy 模块,代码并不多在 https://github.com/istio/ztunnel/tree/f9672662a0448067ce19d1908907b6f4db8016e0/src/proxy 中,只有几个文件。

1
2
3
4
5
6
.
├── inbound.rs
├── inbound_passthrough.rs
├── outbound.rs
├── socks5.rs
└── util.rs

Proxy 承担了整个系统的数据扭转部分的逻辑,非常的重要,基于 Tokio 和 Hyper,非常简单的就实现了整体逻辑。

Outbound 模块

Outbound::new是一个非常简单的函数,也就是启动一个 Socket 来 Accept 请求,并且通过一个 loop 来不断的建立新链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
pub(super) async fn run(self) {
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
let socket = self.listener.accept().await;
let start_outbound_instant = Instant::now();
match socket {
Ok((stream, _remote)) => {
let mut oc = OutboundConnection {
pi: self.pi.clone(),
id: TraceParent::new(),
};
let span = info_span!("outbound", id=%oc.id);
tokio::spawn(
(async move {
let res = oc.proxy(stream).await;
match res {
Ok(_) => info!(dur=?start_outbound_instant.elapsed(), "complete"),
Err(e) => warn!(dur=?start_outbound_instant.elapsed(), err=%e, "failed")
};
})
.instrument(span),
);
}
Err(e) => {
error!("Failed TCP handshake {}", e);
}
}
}
}.in_current_span();
}

主体的逻辑在 OutboundConnection let res = oc.proxy(stream).await; 也就是主体逻辑。

1
2
3
4
5
async fn proxy(&mut self, stream: TcpStream) -> Result<(), Error> {
let peer = socket::to_canonical(stream.peer_addr().expect("must receive peer addr"));
let orig_dst_addr = socket::orig_dst_addr_or_default(&stream);
self.proxy_to(stream, peer.ip(), orig_dst_addr, false).await
}

下面就来到我们正菜部分,proxy_to 就是所有的逻辑了。这部分比较多,拆分解释

根据 IP 来构建转发请求,这里需要的 Source(remote_addr) 和 Dest(orig_dst_addr) 两个地址,这里就可能进入三种状态

  1. passthrought: 找不到 dest
  2. 上下游都找到是走 mtls
    1. 有 waypoint 就去 waypoint
    2. 没有就去另外一个节点的 ztunnel
  3. error
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    let req = self.build_request(remote_addr, orig_dst_addr).await?;

    async fn build_request(
    &self,
    downstream: IpAddr,
    target: SocketAddr,
    ) -> Result<Request, Error> {

    // 根据 IP 来判断 source_workload ,找不到就走 Err 了
    let source_workload = match self.pi.workloads.fetch_workload(&downstream).await {
    Some(wl) => wl,
    None => return Err(Error::UnknownSource(downstream)),
    };

    let us = self.pi.workloads.find_upstream(target, self.pi.hbone_port).await;
    // 找不到 dest, passthrought 掉这个请求
    if us.is_none() {
    return Ok(Request {
    protocol: Protocol::TCP,
    request_type: RequestType::Passthrough,
    });
    }

    let us = us.unwrap();
    // 这里如果找到 waypoint 走 waypoint
    if !us.workload.waypoint_addresses.is_empty() {
    let waypoint_address = us.workload.choose_waypoint_address().unwrap();
    return Ok(Request {
    // Always use HBONE here
    protocol: Protocol::HBONE,
    source: source_workload,
    request_type: RequestType::ToServerWaypoint,
    });
    }
    if us.workload.gateway_address.is_none() {
    return Err(Error::NoGatewayAddress(Box::new(us.workload.clone())));
    }
    // 有个优化路径,如果访问的对方和本地在同一个 NODE 上直接走 local,这里skip了

    // 没有目标地址的 waypoint 就走正常的 ztunnel 路径
    Ok(Request {
    protocol: us.workload.protocol,
    source: source_workload,
    destination: SocketAddr::from((us.workload.workload_ip, us.port)),
    destination_workload: Some(us.workload.clone()),
    expected_identity: Some(us.workload.identity()),
    gateway: us
    .workload
    .gateway_address
    .expect("gateway address confirmed"),
    direction: Direction::Outbound,
    request_type: RequestType::Direct,
    })
    }
    当完成了这一步的时候,我们已经构建好了目标请求。让我们回到 proxy_to,之后,我们需要通过协议来判断我们应该走哪个分支, TCP 是非常的简单的,我们重点看看另外一个路径上的 HBONE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
match req.protocol {
Protocol::HBONE => {

}
Protocol::TCP => {
// 创建一个 TCP 链接
let local = if self.pi.cfg.enable_original_source.unwrap_or_default() {
super::get_original_src_from_stream(&stream)
} else {
None
};
let mut outbound = super::freebind_connect(local, req.gateway).await?;
// 直接 proxy 数据就好了
proxy::relay(
&mut stream,
&mut outbound,
&self.pi.metrics,
transferred_bytes,
)
}
}
}

HBONE 是一个代理协议,工作在 HTTP 那一层。我们看看这里如何处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
let mut builder = hyper::client::conn::Builder::new();
// 构建一个 Builder,主要调整了一些 HTTP 的参数
let builder = builder
.http2_only(true)
.http2_initial_stream_window_size(self.pi.cfg.window_size)
.http2_max_frame_size(self.pi.cfg.frame_size)
.http2_initial_connection_window_size(self.pi.cfg.connection_window_size);

let mut f = http_types::proxies::Forwarded::new();
f.add_for(remote_addr.to_string());

// 构建 HBONE 的 Request,就是一个 Connect 请求
let request = hyper::Request::builder()
.uri(&req.destination.to_string())
.method(hyper::Method::CONNECT)
.version(hyper::Version::HTTP_2)
.header(BAGGAGE_HEADER, baggage(&req))
.header(FORWARDED, f.value().unwrap())
.header(TRACEPARENT_HEADER, self.id.header())
.body(hyper::Body::empty())
.unwrap();
let local = self.pi.cfg.enable_original_source.unwrap_or_default().then_some(remote_addr);
let id = &req.source.identity();

// 这里构建 TLS 的证书管理
let cert = self.pi.cert_manager.fetch_certificate(id).await?;
let connector = cert
.connector(req.expected_identity.as_ref())?
.configure()
.expect("configure");

// 创建一个新的 TCP 连接
let tcp_stream = super::freebind_connect(local, req.gateway).await?;
tcp_stream.set_nodelay(true)?;

// 创建一个新的 TLS 连接(over TCP),然后去握手
let tls_stream = connect_tls(connector, tcp_stream).await?;
let (mut request_sender, connection) = builder
.handshake(tls_stream)
.await
.map_err(Error::HttpHandshake)?;
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Error in HBONE connection handshake: {:?}", e);
}
});

// 发送 HTTP Req
let response = request_sender.send_request(request).await?;
let code = response.status();
if code != 200 {
return Err(Error::HttpStatus(code));
}

// 等到 upgraded 成功
let mut upgraded = hyper::upgrade::on(response).await?;

// 启动传输任务
let res = super::copy_hbone(
&mut upgraded,
&mut stream,
&self.pi.metrics,
transferred_bytes,
)
.instrument(trace_span!("hbone client"))
.await;
res

不过注意,这里还没有真的去传输数据,只是将传输的准备做好,逻辑在 copy_hbone 内,在 copy_hbone 调用的时候,我们已经建立好 HBONE 所需要的 HTTP 链接了。所有有如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
pub async fn copy_hbone(
upgraded: &mut hyper::upgrade::Upgraded,
stream: &mut TcpStream,
metrics: impl AsRef<Metrics>,
transferred_bytes: traffic::BytesTransferred<'_>,
) -> Result<(), Error> {
use tokio::io::AsyncWriteExt;
// 把输出端的 HTTP 拆分成 read 和 write 2个方向
let (mut ri, mut wi) = tokio::io::split(upgraded);
// 把输入端的 TCP 拆分成 read 和 write 2个方向
let (mut ro, mut wo) = stream.split();

let (mut sent, mut received): (u64, u64) = (0, 0);

// 来回传递就好了
let client_to_server = async {
let mut ri = tokio::io::BufReader::with_capacity(HBONE_BUFFER_SIZE, &mut ri);
let mut wo = tokio::io::BufWriter::with_capacity(HBONE_BUFFER_SIZE, &mut wo);
let res = tokio::io::copy(&mut ri, &mut wo).await;
trace!(?res, "hbone -> tcp");
received = res?;
wo.shutdown().await
};

let server_to_client = async {
let mut ro = tokio::io::BufReader::with_capacity(HBONE_BUFFER_SIZE, &mut ro);
let mut wi = tokio::io::BufWriter::with_capacity(HBONE_BUFFER_SIZE, &mut wi);
let res = tokio::io::copy(&mut ro, &mut wi).await;
trace!(?res, "tcp -> hbone");
sent = res?;
wi.shutdown().await
};

// 把任务压入执行器里面
tokio::try_join!(client_to_server, server_to_client)?;
Ok(())
}

Inbound 模块

有了 outbound 的经验,看 Inbound 就相对的简单很多。因为 Inbound 接受的只有 HBONE,先在创建 Inbound 的时候需要一个 HTTP Server,这就是主要的初始化部分

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let acceptor = InboundCertProvider {
workloads: self.workloads.clone(),
cert_manager: self.cert_manager.clone(),
};

// 处理 TLS 的 stream
let tls_stream = crate::hyper_util::tls_server(acceptor, self.listener);

// 基于上诉的再去构建一个 Hyper 的 HTTP 服务
let incoming = hyper::server::accept::from_stream(tls_stream);

let server = Server::builder(incoming)
.http2_only(true)
.http2_initial_stream_window_size(self.cfg.window_size)
.http2_initial_connection_window_size(self.cfg.connection_window_size)
.http2_max_frame_size(self.cfg.frame_size)
.serve(service)
.with_graceful_shutdown(async {
// Wait until the drain is signaled
let shutdown = self.drain.signaled().await;
// Once `shutdown` is dropped, we are declaring the drain is complete. Hyper will start draining
// once with_graceful_shutdown function exists, so we need to exit the function but later
// drop `shutdown`.
if tx.send(shutdown).is_err() {
error!("HBONE receiver dropped")
}
info!("starting drain of inbound connections");
});

建立链接

显然 serve 逻辑就是在下面的 serve_connect,用来处理建立的新链接,这里只需要处理 Connect 请求即可,其他 404 就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
match req.method() {
&Method::CONNECT => {

}
// Return the 404 Not Found for other routes.
method => {
info!("Sending 404, got {method}");
Ok(Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap())
}
}

在 Outbound 里面我们已经知道,这里期望返回 200 就好了,而在 Inbound 里面也就是检查下 mTLS 即可和 RBAC 权限即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 // Orig has 15008, swap with the real port
let conn = rbac::Connection { dst: addr, ..conn };
let Some(upstream) = workloads.fetch_workload(&addr.ip()).await else {
info!(%conn, "unknown destination");
return Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap());
};
!workloads.assert_rbac(&conn).await {
info!(%conn, "RBAC rejected");
return Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap());
}

不过在这里需要注意有两种请求,

  1. zTunnel <–> zTunnel

  2. zTunnel <–> Waypoint

因此在处理上也略有不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// waypoint 请求是不检查
if from_waypoint {
debug!("request from waypoint, skipping policy");
} else if !workloads.assert_rbac(&conn).await {
info!(%conn, "RBAC rejected");
return Ok(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.unwrap());
}

// waypoint 可信,从 Header 里面拿 Forward Header 作为来源 IP
let source_ip = if from_waypoint {
super::get_original_src_from_fwded(&req).unwrap_or(conn.src_ip)
} else {
conn.src_ip
};

// 填充来源即可
let source = workloads.fetch_workload(&source_ip).await;


// 紧接着就是处理 Inbound 的请求即可
Self::handle_inbound(
Hbone(req),
enable_original_source.then_some(source_ip),
addr,
metrics,
connection_metrics,
None,
)

处理请求

处理请求就和 Outbound 非常的接近了,读写即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
let mut stream = stream;
stream.set_nodelay(true)?;

// 在 Runtime 里面去执行下面的 CodeBlock 任务
tokio::task::spawn(
(async move {
let _connection_close = metrics
.increment_defer::<_, traffic::ConnectionClose>(&connection_metrics);

let _extra_conn_close = extra_connection_metrics
.as_ref()
.map(|co| metrics.increment_defer::<_, traffic::ConnectionClose>(co));

let transferred_bytes =
traffic::BytesTransferred::from(&connection_metrics);
match request_type {
// 预留了没用到
DirectPath(mut incoming) => {

}
// Hbone 读取 Body 就可以了,和之前 copy_hbone 一样的逻辑
Hbone(req) => match hyper::upgrade::on(req).await {
Ok(mut upgraded) => {
if let Err(e) = super::copy_hbone(
&mut upgraded,
&mut stream,
&metrics,
transferred_bytes,
)
.instrument(trace_span!("hbone server"))
.await
{
error!(dur=?start.elapsed(), "hbone server copy: {}", e);
}
}
Err(e) => {
// Not sure if this can even happen
error!(dur=?start.elapsed(), "No upgrade {e}");
}
},
}
})
.in_current_span(),
);

Socks5 模块

在 15080 上监听了一个 socks5 模块,但是没有找到具体的用途,先 SKIP 了

Workload Manger 模块

我们在 Inbound 和 Outbound 里面都发现了除了扭转 TCP 的请求数据,还需要处理就是身份信息了,在上述代码里面,我们已经看到这么一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
pub struct WorkloadManager {
workloads: WorkloadInformation,
xds_client: Option<AdsClient>,
}

#[derive(serde::Serialize, Debug, Clone)]
pub struct WorkloadInformation {
#[serde(flatten)]
pub info: Arc<Mutex<WorkloadStore>>,

/// demand, if present, is used to request on-demand updates for workloads.
#[serde(skip_serializing)]
pub demand: Option<Demander>,
}

#[derive(serde::Serialize, Default, Debug)]
pub struct WorkloadStore {
workloads: HashMap<IpAddr, Workload>,
/// workload_to_vip maintains a mapping of workload IP to VIP. This is used only to handle removals.
workload_to_vip: HashMap<IpAddr, HashSet<(SocketAddr, u16)>>,
/// vips maintains a mapping of socket address with service port to workload ip and socket address
/// with target ports in hashset.
vips: HashMap<SocketAddr, HashSet<(IpAddr, u16)>>,

/// policies maintains a mapping of ns/name to policy.
policies: HashMap<String, rbac::Authorization>,
// policies_by_namespace maintains a mapping of namespace (or "" for global) to policy names
policies_by_namespace: HashMap<String, HashSet<String>>,

#[serde(skip_serializing, default)]
cert_tx: Option<mpsc::Sender<Identity>>,

// needed to determine whether or not to prefetch certs
local_node: Option<String>,
}

这里的对象众多,大概的关系是

  • WorkloadManager: 作为最外层对象
    • WorkloadInformation: 对外提供的可用Shadow副本
    • AdsClient: 更新通讯的主要对象

而这个 AdsClient 的构建逻辑是

1
2
3
4
5
6
xds::Config::new(config.clone()) // 默认配置
.with_workload_handler(xds_workloads) // WorkloadStore 实例
.with_authorization_handler(xds_rbac) // WorkloadStore 别名副本
.watch(xds::WORKLOAD_TYPE.into())
.watch(xds::AUTHORIZATION_TYPE.into())
.build(metrics, awaiting_ready.subtask("ads client")),

看到 Arc<Mutex<WorkloadStore>> 的时候,显然我们就知道这个对象显然是会被异步线程更新了。这部分也很容易被找到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fn decode_and_handle<
T: prost::Message + Default + 'static,
F: FnOnce(&AdsClient) -> &Box<dyn Handler<T>>,
>(
&mut self,
f: F,
response: DeltaDiscoveryResponse,
) -> Result<(), Vec<RejectedConfig>> {
let type_url = response.type_url.clone();
let removes = self.handle_removes(&response);
let updates: Vec<XdsUpdate<T>> = response
.resources
.into_iter()
.map(|r| {
let key = ResourceKey {
name: r.name.clone(),
type_url: type_url.clone(),
};
self.notify_on_demand(&key);
self.known_resources
.entry(key.type_url)
.or_default()
.insert(key.name);
r
})
.map(|raw| decode_proto::<T>(raw).unwrap())
.map(XdsUpdate::Update)
.chain(removes.into_iter().map(XdsUpdate::Remove))
.collect();
let handler = f(self);

handler.handle(updates)
}

// 在这里进行处理,通过一系列的回调,对 WorkloadStore 进行了更新操作
impl xds::Handler<XdsWorkload> for Arc<Mutex<WorkloadStore>> {
fn handle(&self, updates: Vec<XdsUpdate<XdsWorkload>>) -> Result<(), Vec<RejectedConfig>> {
let mut wli = self.lock().unwrap();
let handle = |res: XdsUpdate<XdsWorkload>| {
match res {
XdsUpdate::Update(w) => wli.insert_xds_workload(w.resource)?,
XdsUpdate::Remove(name) => {
info!("handling delete {}", name);
wli.remove(name);
}
}
Ok(())
};
xds::handle_single_resource(updates, handle)
}
}

对于 XDS 通讯就不做展开,这里大致上也是一个标准的 Request / Repsone 模式。

XDS 模块

在刚刚的 workload 里面我们是看到一部分和 XDS 相关的内容,这里我们展开看看 AdsClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub struct AdsClient {
config: Config,
/// Stores all known workload resources. Map from type_url to name
known_resources: HashMap<String, HashSet<String>>,

/// pending stores a list of all resources that are pending and XDS push
pending: HashMap<ResourceKey, oneshot::Sender<()>>,

demand: mpsc::Receiver<(oneshot::Sender<()>, ResourceKey)>,
demand_tx: mpsc::Sender<(oneshot::Sender<()>, ResourceKey)>,

pub(crate) metrics: Arc<Metrics>,
block_ready: Option<readiness::BlockReady>,

connection_id: u32,
}

ADSClient 是一个Background程序来加载数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async fn run_loop(&mut self, backoff: Duration) -> Duration {
const MAX_BACKOFF: Duration = Duration::from_secs(15);
match self.run_internal().await {
// 抛开无关的异常
}
}

async fn run_internal(&mut self) -> Result<(), Error> {
// 这里直接打开一个连接去连接 XDS 服务器
let address = self.config.address.clone();
let svc = tls::grpc_connector(address, self.config.root_cert.clone()).unwrap();
let mut client =
AggregatedDiscoveryServiceClient::with_interceptor(svc, self.config.auth.clone());
let (discovery_req_tx, mut discovery_req_rx) = mpsc::channel::<DeltaDiscoveryRequest>(100);
// 这里需要先构建一个 初始化请求,注意这里是一个 vec ,多个
let initial_requests = self.construct_initial_requests();

// 这里用 async stream 的 marco 来完成一个 类似 vec<Req> 的效果,参考 [Rust Async: async-stream](https://zhuanlan.zhihu.com/p/266269521)
// 这里还是很 hack 的,返回了一个 unstopped 的 stream,开头是 initial_requests,后面的从 discovery_req_rx 里接受
let outbound = async_stream::stream! {
for initial in initial_requests {
info!(resources=initial.initial_resource_versions.len(), type_url=initial.type_url, "sending initial request");
yield initial;
}
while let Some(message) = discovery_req_rx.recv().await {
debug!(type_url=message.type_url, "sending request");
yield message
}
warn!("outbound stream complete");
};

let mut response_stream = client
.delta_aggregated_resources(tonic::Request::new(outbound))
.await
.map_err(Error::Connection)?
.into_inner();
debug!("connected established");

info!("Stream established");
// Create a oneshot channel to be notified as soon as we ACK the first XDS response
// 这里开了另外一个 channel,主要是处理当第一个请求 ack 的时候,让系统进入 Ready 状态
let (tx, initial_xds_rx) = oneshot::channel();
let mut initial_xds_tx = Some(tx);
let ready = mem::take(&mut self.block_ready);
tokio::spawn(async move {
match initial_xds_rx.await {
// 这里的 DROP 魔改了默认实现,在 [readiness.rs](https://github.com/istio/ztunnel/blob/f9672662a0448067ce19d1908907b6f4db8016e0/src/readiness.rs#L57-L77)
Ok(_) => drop(ready),
Err(_) => {
debug!("sender was dropped before initial xds sync event was received");
}
}
});

loop {
tokio::select! {
_demand_event = self.demand.recv() => {
self.handle_demand_event(_demand_event, &discovery_req_tx).await?;
}

// 主体逻辑在这里,根据上面的 stream 里面 send 的部分,接收到 response 来挨个处理
msg = response_stream.message() => {
// TODO: If we have responses of different types (e.g. RBAC), we'll want to wait for
// each type to receive a response before marking ready
// 处理函数是 handle_stream_event
if let XdsSignal::Ack = self.handle_stream_event(msg?, &discovery_req_tx).await? {
// 这里巧妙的利用了 initial_xds_tx 做一次通知,来ready整个系统
let val = mem::take(&mut initial_xds_tx);
if let Some(tx) = val {
if let Err(err) = tx.send(()) {
warn!("initial xds sync signal send failed: {:?}", err)
}
}
};
}
}
}
}

上面只是发送部分,具体的处理逻辑,我们继续往下探索,handle_stream_event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async fn handle_stream_event(
&mut self,
stream_event: Option<DeltaDiscoveryResponse>,
send: &mpsc::Sender<DeltaDiscoveryRequest>,
) -> Result<XdsSignal, Error> {

// Due to lack of dynamic typing in Rust we have some code duplication here. In the future this could be a macro,
// but for now its easier to just have a bit of duplication.
let handler_response: Result<(), Vec<RejectedConfig>> = match type_url.as_str() {
xds::WORKLOAD_TYPE => {
self.decode_and_handle::<Workload, _>(|a| &a.config.workload_handler, response)
}
xds::AUTHORIZATION_TYPE => self.decode_and_handle::<Authorization, _>(
|a| &a.config.authorization_handler,
response,
),
_ => {
error!("unknown type");
Ok(())
}
};

let (response_type, error) = match handler_response {
Err(rejects) => {
let error = rejects
.into_iter()
.map(|reject| reject.to_string())
.collect::<Vec<String>>()
.join("; ");
(XdsSignal::Nack, Some(error))
}
_ => (XdsSignal::Ack, None),
};


send.send(DeltaDiscoveryRequest {
type_url, // this is owned, OK to move
response_nonce: nonce, // this is owned, OK to move
error_detail: error.map(|msg| Status {
message: msg,
..Default::default()
}),
..Default::default()
})
.await
.map_err(|e| Error::RequestFailure(Box::new(e)))
.map(|_| response_type)
}