var l *core.Locality if meta.Labels[model.LocalityLabel] == "" && options.Platform != nil { // The locality string was not set, try to get locality from platform l = options.Platform.Locality() } else { localityString := model.GetLocalityLabelOrDefault(meta.Labels[model.LocalityLabel], "") l = util.ConvertLocality(localityString) }
// initializeProxy completes the initialization of a proxy. It is expected to be called only after // initProxyMetadata. func(s *DiscoveryServer) initializeProxy(node *core.Node, con *Connection) error { proxy := con.proxy // this should be done before we look for service instances, but after we load metadata // TODO fix check in kubecontroller treat echo VMs like there isn't a pod if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.Connect); err != nil { return err } s.computeProxyState(proxy, nil)
// Get the locality from the proxy's service instances. // We expect all instances to have the same IP and therefore the same locality. // So its enough to look at the first instance. iflen(proxy.ServiceInstances) > 0 { proxy.Locality = util.ConvertLocality(proxy.ServiceInstances[0].Endpoint.Locality.Label) }
// If there is no locality in the registry then use the one sent as part of the discovery request. // This is not preferable as only the connected Pilot is aware of this proxies location, but it // can still help provide some client-side Envoy context when load balancing based on location. if util.IsLocalityEmpty(proxy.Locality) { proxy.Locality = &core.Locality{ Region: node.Locality.GetRegion(), Zone: node.Locality.GetZone(), SubZone: node.Locality.GetSubZone(), } }
locality := util.LocalityToString(proxy.Locality) // add topology labels to proxy metadata labels proxy.Metadata.Labels = labelutil.AugmentLabels(proxy.Metadata.Labels, proxy.Metadata.ClusterID, locality, proxy.Metadata.Network) // Discover supported IP Versions of proxy so that appropriate config can be delivered. proxy.DiscoverIPVersions()
proxy.WatchedResources = map[string]*model.WatchedResource{} // Based on node metadata and version, we can associate a different generator. if proxy.Metadata.Generator != "" { proxy.XdsResourceGenerator = s.Generators[proxy.Metadata.Generator] }
// getPodLocality retrieves the locality for a pod. func(c *Controller) getPodLocality(pod *v1.Pod) string { // if pod has `istio-locality` label, skip below ops iflen(pod.Labels[model.LocalityLabel]) > 0 { return model.GetLocalityLabelOrDefault(pod.Labels[model.LocalityLabel], "") }
// NodeName is set by the scheduler after the pod is created // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#late-initialization node, err := c.nodeLister.Get(pod.Spec.NodeName) if err != nil { if pod.Spec.NodeName != "" { log.Warnf("unable to get node %q for pod %q/%q: %v", pod.Spec.NodeName, pod.Namespace, pod.Name, err) } return"" }
region := getLabelValue(node.ObjectMeta, NodeRegionLabelGA, NodeRegionLabel) zone := getLabelValue(node.ObjectMeta, NodeZoneLabelGA, NodeZoneLabel) subzone := getLabelValue(node.ObjectMeta, label.TopologySubzone.Name, "")
if region == "" && zone == "" && subzone == "" { return"" }
return region + "/" + zone + "/" + subzone // Format: "%s/%s/%s" }
labels := labelutil.AugmentLabels(wle.Labels, clusterID, wle.Locality, networkID) return &model.ServiceInstance{ Endpoint: &model.IstioEndpoint{ Address: addr, EndpointPort: instancePort, ServicePortName: servicePort.Name, Network: network.ID(wle.Network), Locality: model.Locality{ Label: wle.Locality, ClusterID: clusterID, }, LbWeight: wle.Weight, Labels: labels, TLSMode: tlsMode, ServiceAccount: sa, // Workload entry config name is used as workload name, which will appear in metric label. // After VM auto registry is introduced, workload group annotation should be used for workload name. WorkloadName: configKey.name, Namespace: configKey.namespace, }, Service: service, ServicePort: convertPort(servicePort), }
// Apply the Split Horizon EDS filter, if applicable. llbOpts = b.EndpointsByNetworkFilter(llbOpts)
if model.IsDNSSrvSubsetKey(b.clusterName) { // For the SNI-DNAT clusters, we are using AUTO_PASSTHROUGH gateway. AUTO_PASSTHROUGH is intended // to passthrough mTLS requests. However, at the gateway we do not actually have any way to tell if the // request is a valid mTLS request or not, since its passthrough TLS. // To ensure we allow traffic only to mTLS endpoints, we filter out non-mTLS endpoints for these cluster types. llbOpts = b.EndpointsWithMTLSFilter(llbOpts) } llbOpts = b.ApplyTunnelSetting(llbOpts, b.tunnelType)
l := b.createClusterLoadAssignment(llbOpts)
// If locality aware routing is enabled, prioritize endpoints or set their lb weight. // Failover should only be enabled when there is an outlier detection, otherwise Envoy // will never detect the hosts are unhealthy and redirect traffic. enableFailover, lb := getOutlierDetectionAndLoadBalancerSettings(b.DestinationRule(), b.port, b.subsetName) lbSetting := loadbalancer.GetLocalityLbSetting(b.push.Mesh.GetLocalityLbSetting(), lb.GetLocalityLbSetting()) if lbSetting != nil { // Make a shallow copy of the cla as we are mutating the endpoints with priorities/weights relative to the calling proxy l = util.CloneClusterLoadAssignment(l) wrappedLocalityLbEndpoints := make([]*loadbalancer.WrappedLocalityLbEndpoints, len(llbOpts)) for i := range llbOpts { wrappedLocalityLbEndpoints[i] = &loadbalancer.WrappedLocalityLbEndpoints{ IstioEndpoints: llbOpts[i].istioEndpoints, LocalityLbEndpoints: l.Endpoints[i], } } loadbalancer.ApplyLocalityLBSetting(l, wrappedLocalityLbEndpoints, b.locality, b.proxy.Metadata.Labels, lbSetting, enableFailover) } return l }