type Service struct { // Attributes contains additional attributes associated with the service // used mostly by RBAC for policy enforcement purposes. Attributes ServiceAttributes
// Ports is the set of network ports where the service is listening for // connections Ports PortList `json:"ports,omitempty"`
// ServiceAccounts specifies the service accounts that run the service. ServiceAccounts []string`json:"serviceAccounts,omitempty"`
// CreationTime records the time this service was created, if available. CreationTime time.Time `json:"creationTime,omitempty"`
// Name of the service, e.g. "catalog.mystore.com" Hostname host.Name `json:"hostname"`
// ClusterVIPs specifies the service address of the load balancer // in each of the clusters where the service resides ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`
// DefaultAddress specifies the default service IP of the load balancer. // Do not access directly. Use GetAddressForProxy DefaultAddress string`json:"defaultAddress,omitempty"`
// AutoAllocatedAddress specifies the automatically allocated // IPv4 address out of the reserved Class E subnet // (240.240.0.0/16) for service entries with non-wildcard // hostnames. The IPs assigned to services are not // synchronized across istiod replicas as the DNS resolution // for these service entries happens completely inside a pod // whose proxy is managed by one istiod. That said, the algorithm // to allocate IPs is pretty deterministic that at stable state, two // istiods will allocate the exact same set of IPs for a given set of // service entries. AutoAllocatedAddress string`json:"autoAllocatedAddress,omitempty"`
// Resolution indicates how the service instances need to be resolved before routing // traffic. Most services in the service registry will use static load balancing wherein // the proxy will decide the service instance that will receive the traffic. Service entries // could either use DNS load balancing (i.e. proxy will query DNS server for the IP of the service) // or use the passthrough model (i.e. proxy will forward the traffic to the network endpoint requested // by the caller) Resolution Resolution
// MeshExternal (if true) indicates that the service is external to the mesh. // These services are defined using Istio's ServiceEntry spec. MeshExternal bool
// ResourceVersion represents the internal version of this object. ResourceVersion string }
// convertServices transforms a ServiceEntry config to a list of internal Service objects. funcconvertServices(cfg config.Config) []*model.Service { serviceEntry := cfg.Spec.(*networking.ServiceEntry) creationTime := cfg.CreationTimestamp
var resolution model.Resolution switch serviceEntry.Resolution { case networking.ServiceEntry_NONE: resolution = model.Passthrough case networking.ServiceEntry_DNS: resolution = model.DNSLB case networking.ServiceEntry_DNS_ROUND_ROBIN: resolution = model.DNSRoundRobinLB case networking.ServiceEntry_STATIC: resolution = model.ClientSideLB }
svcPorts := make(model.PortList, 0, len(serviceEntry.Ports)) for _, port := range serviceEntry.Ports { svcPorts = append(svcPorts, convertPort(port)) }
var exportTo map[visibility.Instance]bool iflen(serviceEntry.ExportTo) > 0 { exportTo = make(map[visibility.Instance]bool) for _, e := range serviceEntry.ExportTo { exportTo[visibility.Instance(e)] = true } }
var labelSelectors map[string]string if serviceEntry.WorkloadSelector != nil { labelSelectors = serviceEntry.WorkloadSelector.Labels } hostAddresses := []*HostAddress{} for _, hostname := range serviceEntry.Hosts { iflen(serviceEntry.Addresses) > 0 { for _, address := range serviceEntry.Addresses { // Check if address is an IP first because that is the most common case. if net.ParseIP(address) != nil { hostAddresses = append(hostAddresses, &HostAddress{hostname, address}) } elseif ip, network, cidrErr := net.ParseCIDR(address); cidrErr == nil { newAddress := address ones, zeroes := network.Mask.Size() if ones == zeroes { // /32 mask. Remove the /32 and make it a normal IP address newAddress = ip.String() } hostAddresses = append(hostAddresses, &HostAddress{hostname, newAddress}) } } } else { hostAddresses = append(hostAddresses, &HostAddress{hostname, constants.UnspecifiedIP}) } }
// 更简单的表示 type ServiceInstance struct { Service *Service `json:"service,omitempty"` ServicePort *Port `json:"servicePort,omitempty"` Endpoint *IstioEndpoint `json:"endpoint,omitempty"` }
// 使用在内部逻辑上,需要更多信息的时候 type WorkloadInstance struct { Name string`json:"name,omitempty"` Namespace string`json:"namespace,omitempty"` // Where the workloadInstance come from, valid values are`Pod` or `WorkloadEntry` Kind workloadKind `json:"kind"` Endpoint *IstioEndpoint `json:"endpoint,omitempty"` PortMap map[string]uint32`json:"portMap,omitempty"` // Can only be selected by service entry of DNS type. DNSServiceEntryOnly bool`json:"dnsServiceEntryOnly,omitempty"` }
funcconvertWorkloadInstanceToServiceInstance(workloadInstance *model.WorkloadInstance, serviceEntryServices []*model.Service, serviceEntry *networking.ServiceEntry) []*model.ServiceInstance { out := make([]*model.ServiceInstance, 0) for _, service := range serviceEntryServices { for _, serviceEntryPort := range serviceEntry.Ports { // note: this is same as workloadentry handler // endpoint port will first use the port defined in wle with same port name, // if not port name does not match, use the targetPort specified in ServiceEntry // if both not matched, fallback to ServiceEntry port number. var targetPort uint32 if port, ok := workloadInstance.PortMap[serviceEntryPort.Name]; ok && port > 0 { targetPort = port } elseif serviceEntryPort.TargetPort > 0 { targetPort = serviceEntryPort.TargetPort } else { targetPort = serviceEntryPort.Number } ep := *workloadInstance.Endpoint ep.ServicePortName = serviceEntryPort.Name ep.EndpointPort = targetPort ep.EnvoyEndpoint = nil out = append(out, &model.ServiceInstance{ Endpoint: &ep, Service: service, ServicePort: convertPort(serviceEntryPort), }) } } return out }
for _, ss := range ep.Subsets { for _, ea := range ss.Addresses { pod, expectedPod := getPod(e.c, ea.IP, &metav1.ObjectMeta{Name: ep.Name, Namespace: ep.Namespace}, ea.TargetRef, host) if pod == nil && expectedPod { continue } builder := NewEndpointBuilder(e.c, pod)
// EDS and ServiceEntry use name for service port - ADS will need to map to numbers. for _, port := range ss.Ports { istioEndpoint := builder.buildIstioEndpoint(ea.IP, port.Port, port.Name, discoverabilityPolicy) endpoints = append(endpoints, istioEndpoint) } } } return endpoints }
// initControllers initializes the controllers. func(s *Server) initControllers(args *PilotArgs) error { log.Info("initializing controllers") s.initMulticluster(args) // Certificate controller is created before MCP controller in case MCP server pod // waits to mount a certificate to be provisioned by the certificate controller. if err := s.initCertController(args); err != nil { return fmt.Errorf("error initializing certificate controller: %v", err) } // 配置类 Controller if err := s.initConfigController(args); err != nil { return fmt.Errorf("error initializing config controller: %v", err) } // Data 数据类 Controller if err := s.initServiceControllers(args); err != nil { return fmt.Errorf("error initializing service controllers: %v", err) } returnnil }
type ConfigStore interface { // Schemas exposes the configuration type schema known by the config store. // The type schema defines the bidirectional mapping between configuration // types and the protobuf encoding schema. Schemas() collection.Schemas
// Get retrieves a configuration element by a type and a key Get(typ config.GroupVersionKind, name, namespace string) *config.Config
// List returns objects by type and namespace. // Use "" for the namespace to list across namespaces. List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)
// Create adds a new configuration object to the store. If an object with the // same name and namespace for the type already exists, the operation fails // with no side effects. Create(config config.Config) (revision string, err error)
// Update modifies an existing configuration object in the store. Update // requires that the object has been created. Resource version prevents // overriding a value that has been changed between prior _Get_ and _Put_ // operation to achieve optimistic concurrency. This method returns a new // revision if the operation succeeds. Update(config config.Config) (newRevision string, err error) UpdateStatus(config config.Config) (newRevision string, err error)
// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid // read-modify-write conflicts when there are many concurrent-writers to the same resource. Patch(orig config.Config, patchFn config.PatchFunc) (string, error)
// Delete removes an object from the store by key // For k8s, resourceVersion must be fulfilled before a deletion is carried out. // If not possible, a 409 Conflict status will be returned. Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error }
对于这部分配置来说,就是属于拿来即用的方式,在每一次 Push 之前直接获取即可,比如下面的对于 DR 的使用。
// We need this for multi-network, or for clusters meant for use with AUTO_PASSTHROUGH. if features.EnableAutomTLSCheckPolicies || b.push.NetworkManager().IsMultiNetworkEnabled() || model.IsDNSSrvSubsetKey(clusterName) { b.mtlsChecker = newMtlsChecker(push, port, dr) } return b }
// Used to watch node accessible from remote cluster. // In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type. // With this, we can populate mesh's gateway address with the node ips. nodeInformer cache.SharedIndexInformer nodeLister listerv1.NodeLister
sync.RWMutex // servicesMap stores hostname ==> service, it is used to reduce convertService calls. servicesMap map[host.Name]*model.Service // hostNamesForNamespacedName returns all possible hostnames for the given service name. // If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular // hostname as well as the MCS hostname (clusterset.local). Otherwise, only the regular // hostname will be returned. hostNamesForNamespacedName func(name types.NamespacedName) []host.Name // servicesForNamespacedName returns all services for the given service name. // If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular // service as well as the MCS service (clusterset.local), if available. Otherwise, // only the regular service will be returned. servicesForNamespacedName func(name types.NamespacedName) []*model.Service // nodeSelectorsForServices stores hostname => label selectors that can be used to // refine the set of node port IPs for a service. nodeSelectorsForServices map[host.Name]labels.Instance // map of node name and its address+labels - this is the only thing we need from nodes // for vm to k8s or cross cluster. When node port services select specific nodes by labels, // we run through the label selectors here to pick only ones that we need. // Only nodes with ExternalIP addresses are included in this map ! nodeInfoMap map[string]kubernetesNode // externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance // workload instances from workload entries - map of ip -> workload instance workloadInstancesByIP map[string]*model.WorkloadInstance // Stores a map of workload instance name/namespace to address workloadInstancesIPsByName map[string]string
multinetwork // informerInit is set to true once the controller is running successfully. This ensures we do not // return HasSynced=true before we are running informerInit *atomic.Bool // beginSync is set to true when calling SyncAll, it indicates the controller has began sync resources. beginSync *atomic.Bool // initialSync is set to true after performing an initial in-order processing of all objects. initialSync *atomic.Bool }
// NewController creates a new Kubernetes controller // Created by bootstrap and multicluster (see multicluster.Controller). funcNewController(kubeClient kubelib.Client, options Options) *Controller { c := &Controller{ opts: options, client: kubeClient, queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)), servicesMap: make(map[host.Name]*model.Service), nodeSelectorsForServices: make(map[host.Name]labels.Instance), nodeInfoMap: make(map[string]kubernetesNode), externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance), workloadInstancesByIP: make(map[string]*model.WorkloadInstance), workloadInstancesIPsByName: make(map[string]string), informerInit: atomic.NewBool(false), beginSync: atomic.NewBool(false), initialSync: atomic.NewBool(false),
switch options.EndpointMode { case EndpointsOnly: c.endpoints = newEndpointsController(c) case EndpointSliceOnly: c.endpoints = newEndpointSliceController(c) }
// This is for getting the node IPs of a selected set of nodes c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer() c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister() c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)
// ClusterAdded is passed to the secret controller as a callback to be called // when a remote cluster is added. This function needs to set up all the handlers // to watch for resources being added, deleted or changed on remote clusters. func(m *Multicluster) ClusterAdded(cluster *multicluster.Cluster, clusterStopCh <-chanstruct{}) error { m.m.Lock()
if m.closing { m.m.Unlock() return fmt.Errorf("failed adding member cluster %s: server shutting down", cluster.ID) }
client := cluster.Client
// clusterStopCh is a channel that will be closed when this cluster removed. options := m.opts options.ClusterID = cluster.ID // the aggregate registry's HasSynced will use the k8s controller's HasSynced, so we reference the same timeout options.SyncTimeout = cluster.SyncTimeout // different clusters may have different k8s version, re-apply conditional default options.EndpointMode = DetectEndpointMode(client)
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID) kubeRegistry := NewController(client, options) m.remoteKubeControllers[cluster.ID] = &kubeController{ Controller: kubeRegistry, } // localCluster may also be the "config" cluster, in an external-istiod setup. localCluster := m.opts.ClusterID == cluster.ID
m.m.Unlock()
// TODO move instance cache out of registries if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods { // Add an instance handler in the kubernetes registry to notify service entry store about pod events kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler) }
// TODO only create namespace controller and cert patch for remote clusters (no way to tell currently) if m.startNsController && (features.ExternalIstiod || localCluster) { // Block server exit on graceful termination of the leader controller. m.s.RunComponentAsyncAndWait(func(_ <-chanstruct{})error { log.Infof("joining leader-election for %s in %s on cluster %s", leaderelection.NamespaceController, options.SystemNamespace, options.ClusterID) leaderelection. NewLeaderElection(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, client). AddRunFunction(func(leaderStop <-chanstruct{}) { log.Infof("starting namespace controller for cluster %s", cluster.ID) nc := NewNamespaceController(client, m.caBundleWatcher) // Start informers again. This fixes the case where informers for namespace do not start, // as we create them only after acquiring the leader lock // Note: stop here should be the overall pilot stop, NOT the leader election stop. We are // basically lazy loading the informer, if we stop it when we lose the lock we will never // recreate it again. client.RunAndWait(clusterStopCh) nc.Run(leaderStop) }).Run(clusterStopCh) returnnil }) }
type XDSUpdater interface { // EDSUpdate is called when the list of endpoints or labels in a Service is changed. // For each cluster and hostname, the full list of active endpoints (including empty list) // must be sent. The shard name is used as a key - current implementation is using the // registry name. EDSUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)
// EDSCacheUpdate is called when the list of endpoints or labels in a Service is changed. // For each cluster and hostname, the full list of active endpoints (including empty list) // must be sent. The shard name is used as a key - current implementation is using the // registry name. // Note: the difference with `EDSUpdate` is that it only update the cache rather than requesting a push EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)
// SvcUpdate is called when a service definition is updated/deleted. SvcUpdate(shard ShardKey, hostname string, namespace string, event Event)
// ConfigUpdate is called to notify the XDS server of config updates and request a push. // The requests may be collapsed and throttled. ConfigUpdate(req *PushRequest)
// ProxyUpdate is called to notify the XDS server to send a push to the specified proxy. // The requests may be collapsed and throttled. ProxyUpdate(clusterID cluster.ID, ip string)
// RemoveShard removes all endpoints for the given shard key RemoveShard(shardKey ShardKey) }
// processEndpointEvent triggers the config update. funcprocessEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep interface{})error { // Update internal endpoint cache no matter what kind of service, even headless service. // As for gateways, the cluster discovery type is `EDS` for headless service. updateEDS(c, epc, ep, event) if features.EnableHeadlessService { if svc, _ := c.serviceLister.Services(namespace).Get(name); svc != nil { for _, modelSvc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(svc)) { // if the service is headless service, trigger a full push. if svc.Spec.ClusterIP == v1.ClusterIPNone { c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{ Full: true, // TODO: extend and set service instance type, so no need to re-init push context ConfigsUpdated: map[model.ConfigKey]struct{}{{ Kind: gvk.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace, }: {}}, Reason: []model.TriggerReason{model.EndpointUpdate}, }) returnnil } } } }
// https://github.com/istio/istio/blob/release-1.13/pilot/pkg/model/context.go#L231-L336 type Proxy struct { // Type specifies the node type. First part of the ID. Type NodeType
// IPAddresses is the IP addresses of the proxy used to identify it and its // co-located service instances. Example: "10.60.1.6". In some cases, the host // where the proxy and service instances reside may have more than one IP address IPAddresses []string
// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and // namespace <podName.namespace>. ID string
// Locality is the location of where Envoy proxy runs. This is extracted from // the registry where possible. If the registry doesn't provide a locality for the // proxy it will use the one sent via ADS that can be configured in the Envoy bootstrap Locality *core.Locality }
s.updateMutex.Lock() s.Env.PushContext = push // Ensure we drop the cache in the lock to avoid races, where we drop the cache, fill it back up, then update push context s.dropCacheForRequest(req) s.updateMutex.Unlock()
return push, nil }
也就是说,我们每一次处理请求之后,都会将本次请求的内容转化为全局的当前状态。
也就是整个系统有三种状态。
上一次已经推送的状态
期望的状态 (<—- 处理中
期望的最新状态 ( <—- Pending
API
客户端(pliot-agent)和服务端(istiod) 之间的 API,定义在 ads.go 中,也就是 api 规范中的
// [#protodoc-title: Aggregated Discovery Service (ADS)]
// Discovery services for endpoints, clusters, routes, // and listeners are retained in the package `envoy.api.v2` for backwards // compatibility with existing management servers. New development in discovery // services should proceed in the package `envoy.service.discovery.v2`.
// See https://github.com/envoyproxy/envoy-api#apis for a description of the role of // ADS and how it is intended to be used by a management server. ADS requests // have the same structure as their singleton xDS counterparts, but can // multiplex many resource types on a single stream. The type_url in the // DiscoveryRequest/DiscoveryResponse provides sufficient information to recover // the multiplexed singleton APIs at the Envoy instance and management server. service AggregatedDiscoveryService { // This is a gRPC-only API. rpc StreamAggregatedResources(stream DiscoveryRequest) returns (stream DiscoveryResponse) { }
// processRequest is handling one request. This is currently called from the 'main' thread, which also // handles 'push' requests and close - the code will eventually call the 'push' code, and it needs more mutex // protection. Original code avoided the mutexes by doing both 'push' and 'process requests' in same thread. func(s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error { if !s.shouldProcessRequest(con.proxy, req) { returnnil }
// For now, don't let xDS piggyback debug requests start watchers. if strings.HasPrefix(req.TypeUrl, v3.DebugType) { return s.pushXds(con, s.globalPushContext(), &model.WatchedResource{ TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames, }, &model.PushRequest{Full: true}) } if s.StatusReporter != nil { s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce) } shouldRespond := s.shouldRespond(con, req)
var request *model.PushRequest push := s.globalPushContext() if shouldRespond { // This is a request, trigger a full push for this type. Override the blocked push (if it exists), // as this full push is guaranteed to be a superset of what we would have pushed from the blocked push. request = &model.PushRequest{Full: true, Push: push} } else { // Check if we have a blocked push. If this was an ACK, we will send it. // Either way we remove the blocked push as we will send a push. haveBlockedPush := false con.proxy.Lock() request, haveBlockedPush = con.blockedPushes[req.TypeUrl] delete(con.blockedPushes, req.TypeUrl) con.proxy.Unlock() if haveBlockedPush { // we have a blocked push which we will use log.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID) } else { // This is an ACK, no delayed push // Return immediately, no action needed returnnil } }
request.Reason = append(request.Reason, model.ProxyRequest) request.Start = time.Now() // SidecarScope for the proxy may not have been updated based on this pushContext. // It can happen when `processRequest` comes after push context has been updated(s.initPushContext), // but before proxy's SidecarScope has been updated(s.updateProxy). if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != push.PushVersion { s.computeProxyState(con.proxy, request) } return s.pushXds(con, push, con.Watched(req.TypeUrl), request) }
func(s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) error { if w == nil { returnnil } gen := s.findGenerator(w.TypeUrl, con) if gen == nil { returnnil }