letshutdown = signal::Shutdown::new(); // Setup a drain channel. drain_tx is used to trigger a drain, which will complete // once all drain_rx handlers are dropped. // Any component which wants time to gracefully exit should take in a drain_rx clone, await drain_rx.signaled(), then cleanup. // Note: there is still a hard timeout if the draining takes too long let (drain_tx, drain_rx) = drain::channel();
// spawn all tasks that should run in the main thread admin_server.spawn(); stats_server.spawn(); tokio::spawn( asyncmove { ifletErr(e) = workload_manager.run().await { error!("workload manager: {}", e); } } .in_current_span(), );
letserver = Server::builder(incoming) .http2_only(true) .http2_initial_stream_window_size(self.cfg.window_size) .http2_initial_connection_window_size(self.cfg.connection_window_size) .http2_max_frame_size(self.cfg.frame_size) .serve(service) .with_graceful_shutdown(async { // Wait until the drain is signaled letshutdown = self.drain.signaled().await; // Once `shutdown` is dropped, we are declaring the drain is complete. Hyper will start draining // once with_graceful_shutdown function exists, so we need to exit the function but later // drop `shutdown`. if tx.send(shutdown).is_err() { error!("HBONE receiver dropped") } info!("starting drain of inbound connections"); });
} // Return the 404 Not Found for other routes. method => { info!("Sending 404, got {method}"); Ok(Response::builder() .status(hyper::StatusCode::NOT_FOUND) .body(Body::empty()) .unwrap()) } }
/// demand, if present, is used to request on-demand updates for workloads. #[serde(skip_serializing)] pub demand: Option<Demander>, }
#[derive(serde::Serialize, Default, Debug)] pubstructWorkloadStore { workloads: HashMap<IpAddr, Workload>, /// workload_to_vip maintains a mapping of workload IP to VIP. This is used only to handle removals. workload_to_vip: HashMap<IpAddr, HashSet<(SocketAddr, u16)>>, /// vips maintains a mapping of socket address with service port to workload ip and socket address /// with target ports in hashset. vips: HashMap<SocketAddr, HashSet<(IpAddr, u16)>>,
/// policies maintains a mapping of ns/name to policy. policies: HashMap<String, rbac::Authorization>, // policies_by_namespace maintains a mapping of namespace (or "" for global) to policy names policies_by_namespace: HashMap<String, HashSet<String>>,
pubstructAdsClient { config: Config, /// Stores all known workload resources. Map from type_url to name known_resources: HashMap<String, HashSet<String>>,
/// pending stores a list of all resources that are pending and XDS push pending: HashMap<ResourceKey, oneshot::Sender<()>>,
info!("Stream established"); // Create a oneshot channel to be notified as soon as we ACK the first XDS response // 这里开了另外一个 channel,主要是处理当第一个请求 ack 的时候,让系统进入 Ready 状态 let (tx, initial_xds_rx) = oneshot::channel(); letmut initial_xds_tx = Some(tx); letready = mem::take(&mutself.block_ready); tokio::spawn(asyncmove { match initial_xds_rx.await { // 这里的 DROP 魔改了默认实现,在 [readiness.rs](https://github.com/istio/ztunnel/blob/f9672662a0448067ce19d1908907b6f4db8016e0/src/readiness.rs#L57-L77) Ok(_) => drop(ready), Err(_) => { debug!("sender was dropped before initial xds sync event was received"); } } });
loop { tokio::select! { _demand_event = self.demand.recv() => { self.handle_demand_event(_demand_event, &discovery_req_tx).await?; } // 主体逻辑在这里,根据上面的 stream 里面 send 的部分,接收到 response 来挨个处理 msg = response_stream.message() => { // TODO: If we have responses of different types (e.g. RBAC), we'll want to wait for // each type to receive a response before marking ready // 处理函数是 handle_stream_event ifletXdsSignal::Ack = self.handle_stream_event(msg?, &discovery_req_tx).await? { // 这里巧妙的利用了 initial_xds_tx 做一次通知,来ready整个系统 letval = mem::take(&mut initial_xds_tx); ifletSome(tx) = val { ifletErr(err) = tx.send(()) { warn!("initial xds sync signal send failed: {:?}", err) } } }; } } } }
asyncfnhandle_stream_event( &mutself, stream_event: Option<DeltaDiscoveryResponse>, send: &mpsc::Sender<DeltaDiscoveryRequest>, ) ->Result<XdsSignal, Error> { // Due to lack of dynamic typing in Rust we have some code duplication here. In the future this could be a macro, // but for now its easier to just have a bit of duplication. lethandler_response: Result<(), Vec<RejectedConfig>> = match type_url.as_str() { xds::WORKLOAD_TYPE => { self.decode_and_handle::<Workload, _>(|a| &a.config.workload_handler, response) } xds::AUTHORIZATION_TYPE => self.decode_and_handle::<Authorization, _>( |a| &a.config.authorization_handler, response, ), _ => { error!("unknown type"); Ok(()) } };
send.send(DeltaDiscoveryRequest { type_url, // this is owned, OK to move response_nonce: nonce, // this is owned, OK to move error_detail: error.map(|msg| Status { message: msg, ..Default::default() }), ..Default::default() }) .await .map_err(|e| Error::RequestFailure(Box::new(e))) .map(|_| response_type) }