Isito Pliot 代码大全

虽然起了一个很大的标题,有点标题党的意思,但是本篇会覆盖大多数 istio pliot 的功能,可以耐着性子看下去。

我们在对 istio 有一个初步了解的情况下,我们很容易得知,istio 的整个抽象架构就是一个标准的 CS 模型,对于我们需要解答的就是 istio 是如何将那些配置项和k8s的运行状态,转为 Envoy 所需要使用的数据结构的。

那这里我们分成 4 个部分来阐述

  • 数据模型 istio 内部是如何存储数据的
  • 数据源 istio 使用到的数据是如何获取的,涵盖 MCP 是怎么工作的
  • XdsResourceGenerator istio 是如何将数据转为 Envoy 需要的数据结构的
  • XDS istiod 是如何作为一个 xDS Server 和 Envoy 进行通讯的
Servergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Server struct {
// 实现的 XDS 服务,和 Envoy 进行通讯, 在 [XDS] 章节中涉及
XDSServer *xds.DiscoveryServer

clusterID cluster.ID

// 当前环境信息
environment *model.Environment

kubeClient kubelib.Client

// Istio 各种数据源,在 [数据源] 章节涉及
multiclusterController *multicluster.Controller
configController model.ConfigStoreCache
ConfigStores []model.ConfigStoreCache
serviceEntryStore *serviceentry.ServiceEntryStore


// RWConfigStore is the configstore which allows updates, particularly for status.
RWConfigStore model.ConfigStoreCache
}

数据模型

学习一个系统的最佳方式大概就是阅读它的数据结构。这部分的代码都在 pilot/pkg/model 这个目录下。

对于 istio 系统来说,最为重要的数据来源就是

  • Service | ServiceEntry 这两者会转为为 model.Service
  • POD | workloadEntry 这两者转为 model.WorkloadInstance
  • Enpoints | EndpointSlice 转为 model.ServiceInstance

这些对象都在后续的构建 XDS 协议上,大量的被使用。

Service

对于 K8s Service 和 ServiceEntry 都会转为下述代码的 model.Service 的模型

Servicegithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type Service struct {
// Attributes contains additional attributes associated with the service
// used mostly by RBAC for policy enforcement purposes.
Attributes ServiceAttributes

// Ports is the set of network ports where the service is listening for
// connections
Ports PortList `json:"ports,omitempty"`

// ServiceAccounts specifies the service accounts that run the service.
ServiceAccounts []string `json:"serviceAccounts,omitempty"`

// CreationTime records the time this service was created, if available.
CreationTime time.Time `json:"creationTime,omitempty"`

// Name of the service, e.g. "catalog.mystore.com"
Hostname host.Name `json:"hostname"`

// ClusterVIPs specifies the service address of the load balancer
// in each of the clusters where the service resides
ClusterVIPs AddressMap `json:"clusterVIPs,omitempty"`

// DefaultAddress specifies the default service IP of the load balancer.
// Do not access directly. Use GetAddressForProxy
DefaultAddress string `json:"defaultAddress,omitempty"`

// AutoAllocatedAddress specifies the automatically allocated
// IPv4 address out of the reserved Class E subnet
// (240.240.0.0/16) for service entries with non-wildcard
// hostnames. The IPs assigned to services are not
// synchronized across istiod replicas as the DNS resolution
// for these service entries happens completely inside a pod
// whose proxy is managed by one istiod. That said, the algorithm
// to allocate IPs is pretty deterministic that at stable state, two
// istiods will allocate the exact same set of IPs for a given set of
// service entries.
AutoAllocatedAddress string `json:"autoAllocatedAddress,omitempty"`

// Resolution indicates how the service instances need to be resolved before routing
// traffic. Most services in the service registry will use static load balancing wherein
// the proxy will decide the service instance that will receive the traffic. Service entries
// could either use DNS load balancing (i.e. proxy will query DNS server for the IP of the service)
// or use the passthrough model (i.e. proxy will forward the traffic to the network endpoint requested
// by the caller)
Resolution Resolution

// MeshExternal (if true) indicates that the service is external to the mesh.
// These services are defined using Istio's ServiceEntry spec.
MeshExternal bool

// ResourceVersion represents the internal version of this object.
ResourceVersion string
}

Convert

  1. 从 K8s Serivce 转为 model.Service 的逻辑在 ConvertService 中
Convertgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func ConvertService(svc coreV1.Service, domainSuffix string, clusterID cluster.ID) *model.Service {
addr := constants.UnspecifiedIP
resolution := model.ClientSideLB
meshExternal := false

// 端口号
ports := make([]*model.Port, 0, len(svc.Spec.Ports))
for _, port := range svc.Spec.Ports {
ports = append(ports, convertPort(port))
}

istioService := &model.Service{
// 转为 FQDN
Hostname: ServiceHostname(svc.Name, svc.Namespace, domainSuffix),
ClusterVIPs: model.AddressMap{
Addresses: map[cluster.ID][]string{
clusterID: {addr},
},
},
Ports: ports,
DefaultAddress: addr,
ServiceAccounts: serviceaccounts,
MeshExternal: meshExternal,
Resolution: resolution,
CreationTime: svc.CreationTimestamp.Time,
ResourceVersion: svc.ResourceVersion,
Attributes: model.ServiceAttributes{
ServiceRegistry: provider.Kubernetes,
Name: svc.Name,
Namespace: svc.Namespace,
Labels: svc.Labels,
ExportTo: exportTo,
LabelSelectors: svc.Spec.Selector,
},
}
return istioService
}

更多参考单侧

  1. 从 ServiceEntry 转化
convertServicesgithhub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// convertServices transforms a ServiceEntry config to a list of internal Service objects.
func convertServices(cfg config.Config) []*model.Service {
serviceEntry := cfg.Spec.(*networking.ServiceEntry)
creationTime := cfg.CreationTimestamp

var resolution model.Resolution
switch serviceEntry.Resolution {
case networking.ServiceEntry_NONE:
resolution = model.Passthrough
case networking.ServiceEntry_DNS:
resolution = model.DNSLB
case networking.ServiceEntry_DNS_ROUND_ROBIN:
resolution = model.DNSRoundRobinLB
case networking.ServiceEntry_STATIC:
resolution = model.ClientSideLB
}

svcPorts := make(model.PortList, 0, len(serviceEntry.Ports))
for _, port := range serviceEntry.Ports {
svcPorts = append(svcPorts, convertPort(port))
}

var exportTo map[visibility.Instance]bool
if len(serviceEntry.ExportTo) > 0 {
exportTo = make(map[visibility.Instance]bool)
for _, e := range serviceEntry.ExportTo {
exportTo[visibility.Instance(e)] = true
}
}

var labelSelectors map[string]string
if serviceEntry.WorkloadSelector != nil {
labelSelectors = serviceEntry.WorkloadSelector.Labels
}
hostAddresses := []*HostAddress{}
for _, hostname := range serviceEntry.Hosts {
if len(serviceEntry.Addresses) > 0 {
for _, address := range serviceEntry.Addresses {
// Check if address is an IP first because that is the most common case.
if net.ParseIP(address) != nil {
hostAddresses = append(hostAddresses, &HostAddress{hostname, address})
} else if ip, network, cidrErr := net.ParseCIDR(address); cidrErr == nil {
newAddress := address
ones, zeroes := network.Mask.Size()
if ones == zeroes {
// /32 mask. Remove the /32 and make it a normal IP address
newAddress = ip.String()
}
hostAddresses = append(hostAddresses, &HostAddress{hostname, newAddress})
}
}
} else {
hostAddresses = append(hostAddresses, &HostAddress{hostname, constants.UnspecifiedIP})
}
}

return buildServices(hostAddresses, cfg.Namespace, svcPorts, serviceEntry.Location, resolution,
exportTo, labelSelectors, serviceEntry.SubjectAltNames, creationTime, cfg.Labels)
}

更多参考单测

Instance

而实例会使用如下定义 ServiceInstanceWorkloadInstance 两种,后续会提及有什么区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 更简单的表示
type ServiceInstance struct {
Service *Service `json:"service,omitempty"`
ServicePort *Port `json:"servicePort,omitempty"`
Endpoint *IstioEndpoint `json:"endpoint,omitempty"`
}

// 使用在内部逻辑上,需要更多信息的时候
type WorkloadInstance struct {
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
// Where the workloadInstance come from, valid values are`Pod` or `WorkloadEntry`
Kind workloadKind `json:"kind"`
Endpoint *IstioEndpoint `json:"endpoint,omitempty"`
PortMap map[string]uint32 `json:"portMap,omitempty"`
// Can only be selected by service entry of DNS type.
DNSServiceEntryOnly bool `json:"dnsServiceEntryOnly,omitempty"`
}

Convert

  1. 将 WorkloadEntry 转为 model.ServiceInstance 的采用的是 convertWorkloadInstanceToServiceInstance

    convertWorkloadInstanceToServiceInstancegithub
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    func convertWorkloadInstanceToServiceInstance(workloadInstance *model.WorkloadInstance, serviceEntryServices []*model.Service,
    serviceEntry *networking.ServiceEntry) []*model.ServiceInstance {
    out := make([]*model.ServiceInstance, 0)
    for _, service := range serviceEntryServices {
    for _, serviceEntryPort := range serviceEntry.Ports {
    // note: this is same as workloadentry handler
    // endpoint port will first use the port defined in wle with same port name,
    // if not port name does not match, use the targetPort specified in ServiceEntry
    // if both not matched, fallback to ServiceEntry port number.
    var targetPort uint32
    if port, ok := workloadInstance.PortMap[serviceEntryPort.Name]; ok && port > 0 {
    targetPort = port
    } else if serviceEntryPort.TargetPort > 0 {
    targetPort = serviceEntryPort.TargetPort
    } else {
    targetPort = serviceEntryPort.Number
    }
    ep := *workloadInstance.Endpoint
    ep.ServicePortName = serviceEntryPort.Name
    ep.EndpointPort = targetPort
    ep.EnvoyEndpoint = nil
    out = append(out, &model.ServiceInstance{
    Endpoint: &ep,
    Service: service,
    ServicePort: convertPort(serviceEntryPort),
    })
    }
    }
    return out
    }
  2. 从 POD 转为 WorkloadInstace

Buildergithub
1
2
3
4
5
6
7
8
ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(pod.Status.PodIP, 0, "", model.AlwaysDiscoverable)
workloadInstance := &model.WorkloadInstance{
Name: pod.Name,
Namespace: pod.Namespace,
Kind: model.PodKind,
Endpoint: ep,
PortMap: getPortMap(pod),
}
  1. 从 Entpoint 转为 ServiceInstance
buildEnpointsgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (e *endpointsController) buildIstioEndpoints(endpoint interface{}, host host.Name) []*model.IstioEndpoint {
var endpoints []*model.IstioEndpoint
ep := endpoint.(*v1.Endpoints)

discoverabilityPolicy := e.c.exports.EndpointDiscoverabilityPolicy(e.c.GetService(host))

for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
pod, expectedPod := getPod(e.c, ea.IP, &metav1.ObjectMeta{Name: ep.Name, Namespace: ep.Namespace}, ea.TargetRef, host)
if pod == nil && expectedPod {
continue
}
builder := NewEndpointBuilder(e.c, pod)

// EDS and ServiceEntry use name for service port - ADS will need to map to numbers.
for _, port := range ss.Ports {
istioEndpoint := builder.buildIstioEndpoint(ea.IP, port.Port, port.Name, discoverabilityPolicy)
endpoints = append(endpoints, istioEndpoint)
}
}
}
return endpoints
}

数据源

本章节主要阐述,大多数日常操作的 CR 资源如

  • VirtualService [istio资源]
  • Service [k8s资源]

是如何在 Istio 存储并被使用的

Istio 的数据大致分为2种类型,

  • Config 类型的:也就是 VirtualService 之类的配置
  • Data 类型的:也就是运行期的 Service POD 之类的

在系统的启动之初就明确的得知,系统有三个部分的 资源 Controller

initControllersgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// initControllers initializes the controllers.
func (s *Server) initControllers(args *PilotArgs) error {
log.Info("initializing controllers")
s.initMulticluster(args)
// Certificate controller is created before MCP controller in case MCP server pod
// waits to mount a certificate to be provisioned by the certificate controller.
if err := s.initCertController(args); err != nil {
return fmt.Errorf("error initializing certificate controller: %v", err)
}
// 配置类 Controller
if err := s.initConfigController(args); err != nil {
return fmt.Errorf("error initializing config controller: %v", err)
}
// Data 数据类 Controller
if err := s.initServiceControllers(args); err != nil {
return fmt.Errorf("error initializing service controllers: %v", err)
}
return nil
}

Config 类数据源

代码大多数在 pkg/config 这个目录下,在启动的时候我们就看到端倪,支持

  • File 文件类型
  • XDS xds 服务类型
  • k8s k8s 类型
initConfigSourcesgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (s *Server) initConfigSources(args *PilotArgs) (err error) {
for _, configSource := range s.environment.Mesh().ConfigSources {
srcAddress, err := url.Parse(configSource.Address)
if err != nil {
return fmt.Errorf("invalid config URL %s %v", configSource.Address, err)
}
scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
switch scheme {
case File:
if srcAddress.Path == "" {
return fmt.Errorf("invalid fs config URL %s, contains no file path", configSource.Address)
}
store := memory.Make(collections.Pilot)
configController := memory.NewController(store)

err := s.makeFileMonitor(srcAddress.Path, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
case XDS:
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
Namespace: args.Namespace,
Workload: args.PodName,
Revision: args.Revision,
Meta: model.NodeMetadata{
Generator: "api",
// To reduce transported data if upstream server supports. Especially for custom servers.
IstioRevision: args.Revision,
}.ToStruct(),
InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
})
if err != nil {
return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
}
store := memory.MakeSkipValidation(collections.Pilot)
configController := memory.NewController(store)
configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
xdsMCP.Store = model.MakeIstioStore(configController)
err = xdsMCP.Run()
if err != nil {
return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores, configController)
log.Warn("Started XDS config ", s.ConfigStores)
case Kubernetes:
if srcAddress.Path == "" || srcAddress.Path == "/" {
err2 := s.initK8SConfigStore(args)
if err2 != nil {
log.Warn("Error loading k8s ", err2)
return err2
}
log.Warn("Started K8S config")
} else {
log.Warnf("Not implemented, ignore: %v", configSource.Address)
// TODO: handle k8s:// scheme for remote cluster. Use same mechanism as service registry,
// using the cluster name as key to match a secret.
}
default:
log.Warnf("Ignoring unsupported config source: %v", configSource.Address)
}
}
return nil
}

因为 Istio 有多种数据源,最近提供了统一的聚合访问方式

1
2
3
4
5
6
7
8
9
type store struct {
// schemas is the unified
schemas collection.Schemas

// stores is a mapping from config type to a store
stores map[config.GroupVersionKind][]model.ConfigStore

writer model.ConfigStore
}

Istio 的资源都放在了 IstioConfigStore 中

IstioConfigStoregithub
1
2
3
4
5
6
7
8
9
10
11
12
type IstioConfigStore interface {
ConfigStore

// ServiceEntries lists all service entries
ServiceEntries() []config.Config

// Gateways lists all gateways bound to the specified workload labels
Gateways(workloadLabels labels.Collection) []config.Config

// AuthorizationPolicies selects AuthorizationPolicies in the specified namespace.
AuthorizationPolicies(namespace string) []config.Config
}

下面的3个函数很明显,第一个 ConfigStore 是一个通用接口,类似于 RuntimeClient 提供了通用的 Get List 的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type ConfigStore interface {
// Schemas exposes the configuration type schema known by the config store.
// The type schema defines the bidirectional mapping between configuration
// types and the protobuf encoding schema.
Schemas() collection.Schemas

// Get retrieves a configuration element by a type and a key
Get(typ config.GroupVersionKind, name, namespace string) *config.Config

// List returns objects by type and namespace.
// Use "" for the namespace to list across namespaces.
List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)

// Create adds a new configuration object to the store. If an object with the
// same name and namespace for the type already exists, the operation fails
// with no side effects.
Create(config config.Config) (revision string, err error)

// Update modifies an existing configuration object in the store. Update
// requires that the object has been created. Resource version prevents
// overriding a value that has been changed between prior _Get_ and _Put_
// operation to achieve optimistic concurrency. This method returns a new
// revision if the operation succeeds.
Update(config config.Config) (newRevision string, err error)
UpdateStatus(config config.Config) (newRevision string, err error)

// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid
// read-modify-write conflicts when there are many concurrent-writers to the same resource.
Patch(orig config.Config, patchFn config.PatchFunc) (string, error)

// Delete removes an object from the store by key
// For k8s, resourceVersion must be fulfilled before a deletion is carried out.
// If not possible, a 409 Conflict status will be returned.
Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

对于这部分配置来说,就是属于拿来即用的方式,在每一次 Push 之前直接获取即可,比如下面的对于 DR 的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func NewEndpointBuilder(clusterName string, proxy *model.Proxy, push *model.PushContext) EndpointBuilder {
_, subsetName, hostname, port := model.ParseSubsetKey(clusterName)
svc := push.ServiceForHostname(proxy, hostname)
dr := push.DestinationRule(proxy, svc) // 直接去cache中获得
b := EndpointBuilder{
clusterName: clusterName,
network: proxy.Metadata.Network,
networkView: proxy.GetNetworkView(),
clusterID: proxy.Metadata.ClusterID,
locality: proxy.Locality,
service: svc,
clusterLocal: push.IsClusterLocal(svc),
destinationRule: dr,
tunnelType: GetTunnelBuilderType(clusterName, proxy, push),

push: push,
proxy: proxy,
subsetName: subsetName,
hostname: hostname,
port: port,
}

// We need this for multi-network, or for clusters meant for use with AUTO_PASSTHROUGH.
if features.EnableAutomTLSCheckPolicies ||
b.push.NetworkManager().IsMultiNetworkEnabled() || model.IsDNSSrvSubsetKey(clusterName) {
b.mtlsChecker = newMtlsChecker(push, port, dr)
}
return b
}

Data 类数据源

Data 类型的分为了2个部分

  • Kube 中的 POD Service
  • Istio 提供的 ServiceEntry WorkloadEntry 的资源

这两部分也就对应了下面的代码

initServiceControllersgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := s.ServiceController()

s.serviceEntryStore = serviceentry.NewServiceDiscovery(
s.configController, s.environment.IstioConfigStore, s.XDSServer,
serviceentry.WithClusterID(s.clusterID),
)
serviceControllers.AddRegistry(s.serviceEntryStore)

registered := make(map[provider.ID]bool)
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
if err := s.initKubeRegistry(args); err != nil {
return err
}
case provider.Mock:
s.initMockRegistry()
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}

// Defer running of the service controllers.
s.addStartFunc(func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})

return nil
}

// 这里是初始化 K8S 的逻辑
// initKubeRegistry creates all the k8s service controllers under this pilot
func (s *Server) initKubeRegistry(args *PilotArgs) (err error) {
args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
args.RegistryOptions.KubeOptions.Metrics = s.environment
args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
args.RegistryOptions.KubeOptions.NetworksWatcher = s.environment.NetworksWatcher
args.RegistryOptions.KubeOptions.MeshWatcher = s.environment.Watcher
args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
args.RegistryOptions.KubeOptions.MeshServiceController = s.ServiceController()

s.multiclusterController.AddHandler(kubecontroller.NewMulticluster(args.PodName,
s.kubeClient,
args.RegistryOptions.ClusterRegistriesNamespace,
args.RegistryOptions.KubeOptions,
s.serviceEntryStore,
s.istiodCertBundleWatcher,
args.Revision,
s.shouldStartNsController(),
s.environment.ClusterLocal(),
s.server))

return
}

Kubernetes 资源

这部分的数据代码都大多数放在 pilot/pkg/serviceregistry 这个目录下, 作为一个 Controller 通过 Information 的机制来关注各种资源的变化

controllergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
type Controller struct {
opts Options

client kubelib.Client

queue queue.Instance

nsInformer cache.SharedIndexInformer
nsLister listerv1.NamespaceLister

serviceInformer filter.FilteredSharedIndexInformer
serviceLister listerv1.ServiceLister

endpoints kubeEndpointsController

// Used to watch node accessible from remote cluster.
// In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type.
// With this, we can populate mesh's gateway address with the node ips.
nodeInformer cache.SharedIndexInformer
nodeLister listerv1.NodeLister

exports serviceExportCache
imports serviceImportCache
pods *PodCache

handlers model.ControllerHandlers

// This is only used for test
stop chan struct{}

sync.RWMutex
// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
servicesMap map[host.Name]*model.Service
// hostNamesForNamespacedName returns all possible hostnames for the given service name.
// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
// hostname as well as the MCS hostname (clusterset.local). Otherwise, only the regular
// hostname will be returned.
hostNamesForNamespacedName func(name types.NamespacedName) []host.Name
// servicesForNamespacedName returns all services for the given service name.
// If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular
// service as well as the MCS service (clusterset.local), if available. Otherwise,
// only the regular service will be returned.
servicesForNamespacedName func(name types.NamespacedName) []*model.Service
// nodeSelectorsForServices stores hostname => label selectors that can be used to
// refine the set of node port IPs for a service.
nodeSelectorsForServices map[host.Name]labels.Instance
// map of node name and its address+labels - this is the only thing we need from nodes
// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
// we run through the label selectors here to pick only ones that we need.
// Only nodes with ExternalIP addresses are included in this map !
nodeInfoMap map[string]kubernetesNode
// externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services
externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance
// workload instances from workload entries - map of ip -> workload instance
workloadInstancesByIP map[string]*model.WorkloadInstance
// Stores a map of workload instance name/namespace to address
workloadInstancesIPsByName map[string]string

multinetwork
// informerInit is set to true once the controller is running successfully. This ensures we do not
// return HasSynced=true before we are running
informerInit *atomic.Bool
// beginSync is set to true when calling SyncAll, it indicates the controller has began sync resources.
beginSync *atomic.Bool
// initialSync is set to true after performing an initial in-order processing of all objects.
initialSync *atomic.Bool
}

// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see multicluster.Controller).
func NewController(kubeClient kubelib.Client, options Options) *Controller {
c := &Controller{
opts: options,
client: kubeClient,
queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
servicesMap: make(map[host.Name]*model.Service),
nodeSelectorsForServices: make(map[host.Name]labels.Instance),
nodeInfoMap: make(map[string]kubernetesNode),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
workloadInstancesByIP: make(map[string]*model.WorkloadInstance),
workloadInstancesIPsByName: make(map[string]string),
informerInit: atomic.NewBool(false),
beginSync: atomic.NewBool(false),
initialSync: atomic.NewBool(false),

multinetwork: initMultinetwork(),
}

c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
c.initDiscoveryHandlers(kubeClient, options.EndpointMode, options.MeshWatcher, c.opts.DiscoveryNamespacesFilter)

c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())
c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)

switch options.EndpointMode {
case EndpointsOnly:
c.endpoints = newEndpointsController(c)
case EndpointSliceOnly:
c.endpoints = newEndpointSliceController(c)
}

// This is for getting the node IPs of a selected set of nodes
c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer()
c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister()
c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)

podInformer := filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Pods().Informer())
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)

c.exports = newServiceExportCache(c)
c.imports = newServiceImportCache(c)

return c
}

对于单个集群是这么做的,那多个集群呢,我们接着往下看

在实践的过程中,我们已经知道对于纳管的集群,istio 会生成一个 configmap 存放目标集群的 KUBECONF,也就是随着主机群(配置集群)中的CM被建立就会触发如下操作进行。

clusterAddgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// ClusterAdded is passed to the secret controller as a callback to be called
// when a remote cluster is added. This function needs to set up all the handlers
// to watch for resources being added, deleted or changed on remote clusters.
func (m *Multicluster) ClusterAdded(cluster *multicluster.Cluster, clusterStopCh <-chan struct{}) error {
m.m.Lock()

if m.closing {
m.m.Unlock()
return fmt.Errorf("failed adding member cluster %s: server shutting down", cluster.ID)
}

client := cluster.Client

// clusterStopCh is a channel that will be closed when this cluster removed.
options := m.opts
options.ClusterID = cluster.ID
// the aggregate registry's HasSynced will use the k8s controller's HasSynced, so we reference the same timeout
options.SyncTimeout = cluster.SyncTimeout
// different clusters may have different k8s version, re-apply conditional default
options.EndpointMode = DetectEndpointMode(client)

log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
kubeRegistry := NewController(client, options)
m.remoteKubeControllers[cluster.ID] = &kubeController{
Controller: kubeRegistry,
}
// localCluster may also be the "config" cluster, in an external-istiod setup.
localCluster := m.opts.ClusterID == cluster.ID

m.m.Unlock()

// TODO move instance cache out of registries
if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods {
// Add an instance handler in the kubernetes registry to notify service entry store about pod events
kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler)
}


// TODO only create namespace controller and cert patch for remote clusters (no way to tell currently)
if m.startNsController && (features.ExternalIstiod || localCluster) {
// Block server exit on graceful termination of the leader controller.
m.s.RunComponentAsyncAndWait(func(_ <-chan struct{}) error {
log.Infof("joining leader-election for %s in %s on cluster %s",
leaderelection.NamespaceController, options.SystemNamespace, options.ClusterID)
leaderelection.
NewLeaderElection(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, m.revision, client).
AddRunFunction(func(leaderStop <-chan struct{}) {
log.Infof("starting namespace controller for cluster %s", cluster.ID)
nc := NewNamespaceController(client, m.caBundleWatcher)
// Start informers again. This fixes the case where informers for namespace do not start,
// as we create them only after acquiring the leader lock
// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
// basically lazy loading the informer, if we stop it when we lose the lock we will never
// recreate it again.
client.RunAndWait(clusterStopCh)
nc.Run(leaderStop)
}).Run(clusterStopCh)
return nil
})
}

return nil
}

XdsResourceGenerator

当我们知道了 istio 的 数据源和数据模式之后,那下一步就是 istio 是如何把这些数据转为 Envoy 所能感知到的 XDS 的协议,这部分是通过 XdsResourceGenerator 来生成各种配置。

最终的抽象如下

Generatorgithub
1
2
3
4
type XdsResourceGenerator interface {
// Generate generates the Sotw resources for Xds.
Generate(proxy *Proxy, push *PushContext, w *WatchedResource, updates *PushRequest) (Resources, XdsLogDetails, error)
}

对于不同的资源产生的方式采用了不同的方式实现。
比如 NDS 就是用来处理 DNS 的规则

NDSgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type NdsGenerator struct {
Server *DiscoveryServer
}

func (n NdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource,
req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
if !ndsNeedsPush(req) {
return nil, model.DefaultXdsLogDetails, nil
}
nt := n.Server.ConfigGenerator.BuildNameTable(proxy, push)
if nt == nil {
return nil, model.DefaultXdsLogDetails, nil
}
resources := model.Resources{&discovery.Resource{Resource: util.MessageToAny(nt)}}
return resources, model.DefaultXdsLogDetails, nil
}

Proxyelss ConfigGenerator

在充满私货的 无代理 gRPC 服务的 Traffic Director 中,谷歌为 istiod 增加了对 Proxyless 的支持,我们来看看这是如何实现的。

我们既然知道了 istio 提供了生成的抽象,我们自然可以从 XdsResourceGenerator 入手,快速的定位 grpc 的配置生成的地方。

Generategithub
1
2
3
4
5
6
7
8
9
10
11
12
13
func (g *GrpcConfigGenerator) Generate(proxy *model.Proxy, push *model.PushContext,
w *model.WatchedResource, updates *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
switch w.TypeUrl {
case v3.ListenerType:
return g.BuildListeners(proxy, push, w.ResourceNames), model.DefaultXdsLogDetails, nil
case v3.ClusterType:
return g.BuildClusters(proxy, push, w.ResourceNames), model.DefaultXdsLogDetails, nil
case v3.RouteType:
return g.BuildHTTPRoutes(proxy, push, w.ResourceNames), model.DefaultXdsLogDetails, nil
}

return nil, model.DefaultXdsLogDetails, nil
}

显然在 LDS CDS 之外新增了一种新的 XDS 类型。

InitGeneratorsgithub
1
2
3
4
5
s.Generators["grpc"] = &grpcgen.GrpcConfigGenerator{}
s.Generators["grpc/"+v3.EndpointType] = edsGen
s.Generators["grpc/"+v3.ListenerType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.RouteType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.ClusterType] = s.Generators["grpc"]

根据子类型,我们显然知道了,Proxyless GRPC 会关注 Listener RouteCluster Endopint 这四个类型。

XDS

从上面的 数据源 和 XdsResourceGenerator 中我们已经得知了系统是如何从 各种配置+数据变化为 XDS 资源的。
那下步就是如何将数据推送到各个实例上,这里就涉及到 PushContext

他们之间联系的枢纽是 XDSUpdater

XDSUpdatergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type XDSUpdater interface {
// EDSUpdate is called when the list of endpoints or labels in a Service is changed.
// For each cluster and hostname, the full list of active endpoints (including empty list)
// must be sent. The shard name is used as a key - current implementation is using the
// registry name.
EDSUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)

// EDSCacheUpdate is called when the list of endpoints or labels in a Service is changed.
// For each cluster and hostname, the full list of active endpoints (including empty list)
// must be sent. The shard name is used as a key - current implementation is using the
// registry name.
// Note: the difference with `EDSUpdate` is that it only update the cache rather than requesting a push
EDSCacheUpdate(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)

// SvcUpdate is called when a service definition is updated/deleted.
SvcUpdate(shard ShardKey, hostname string, namespace string, event Event)

// ConfigUpdate is called to notify the XDS server of config updates and request a push.
// The requests may be collapsed and throttled.
ConfigUpdate(req *PushRequest)

// ProxyUpdate is called to notify the XDS server to send a push to the specified proxy.
// The requests may be collapsed and throttled.
ProxyUpdate(clusterID cluster.ID, ip string)

// RemoveShard removes all endpoints for the given shard key
RemoveShard(shardKey ShardKey)
}

在各个 ServiceController 和 ConfigStore 产生变化的时候,都会触发这里的 Update 函数来完成推送,比如在 Kube Endpoint 产生变化的时候就会触发如下逻辑

processgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// processEndpointEvent triggers the config update.
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep interface{}) error {
// Update internal endpoint cache no matter what kind of service, even headless service.
// As for gateways, the cluster discovery type is `EDS` for headless service.
updateEDS(c, epc, ep, event)
if features.EnableHeadlessService {
if svc, _ := c.serviceLister.Services(namespace).Get(name); svc != nil {
for _, modelSvc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(svc)) {
// if the service is headless service, trigger a full push.
if svc.Spec.ClusterIP == v1.ClusterIPNone {
c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
// TODO: extend and set service instance type, so no need to re-init push context
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: modelSvc.Hostname.String(),
Namespace: svc.Namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
return nil
}
}
}
}

return nil
}

PushContext

PushContext 这里有两种

  • Global 当前系统的最新的状态, 作用在 DiscoveryServer 中的 Env
  • Request 中包含的局部有时间戳作为版本的 Context ,作用于 PushRequest 中

Proxy

每一个 IstioProxy 节点就是一个 Proxy,因为对于不同的 Proxy 需要传输不同的数据,因此,大多数的构建逻辑中,包含的也就是 PushContext & Proxy 这两个对象。

Proxygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// https://github.com/istio/istio/blob/release-1.13/pilot/pkg/model/context.go#L231-L336
type Proxy struct {
// Type specifies the node type. First part of the ID.
Type NodeType

// IPAddresses is the IP addresses of the proxy used to identify it and its
// co-located service instances. Example: "10.60.1.6". In some cases, the host
// where the proxy and service instances reside may have more than one IP address
IPAddresses []string

// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
// namespace <podName.namespace>.
ID string

// Locality is the location of where Envoy proxy runs. This is extracted from
// the registry where possible. If the registry doesn't provide a locality for the
// proxy it will use the one sent via ADS that can be configured in the Envoy bootstrap
Locality *core.Locality
}

PushRequest

当系统发生一次需要更新事件的时候,就会触发一个 PushRequest,代表了系统需要进行更新。这个有个初学者错误的认知,这个 PushRequet 并不和 Sidecar 绑定,代表的是整个网格产生了变化,应该发生 Push 至于 Push 给谁,不是这个对象处理的。所以在

1
2
3
4
5
6
7
8
func NewDiscoveryServer(env *model.Environment, plugins []string, instanceID string, systemNameSpace string,
clusterAliases map[string]string) *DiscoveryServer {
out := &DiscoveryServer{
pushChannel: make(chan *model.PushRequest, 10),
pushQueue: NewPushQueue(),
Cache: model.DisabledCache{},
instanceID: instanceID,
}

pushChannel 并不很大,因为仅仅代表了系统触发更新,需要给 Clients 进行推送。
因此在推送完当前的 PushRequest 之后,需要更新后续的 PushRequest

Pushgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *DiscoveryServer) Push(req *model.PushRequest) {
// Reset the status during the push.
oldPushContext := s.globalPushContext()

versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10)
push, err := s.initPushContext(req, oldPushContext, versionLocal)
if err != nil {
return
}

req.Push = push
s.AdsPushAll(versionLocal, req)
}

这里根据之前的生成了当前的 PushRequest,在我们初始化完 当前请求的 context 之后又回过来更新

initPushContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *DiscoveryServer) initPushContext(req *model.PushRequest, oldPushContext *model.PushContext, version string) (*model.PushContext, error) {
push := model.NewPushContext()
push.PushVersion = version
push.JwtKeyResolver = s.JwtKeyResolver
if err := push.InitContext(s.Env, oldPushContext, req); err != nil {
log.Errorf("XDS: failed to init push context: %v", err)
// We can't push if we can't read the data - stick with previous version.
pushContextErrors.Increment()
return nil, err
}

if err := s.UpdateServiceShards(push); err != nil {
return nil, err
}

s.updateMutex.Lock()
s.Env.PushContext = push
// Ensure we drop the cache in the lock to avoid races, where we drop the cache, fill it back up, then update push context
s.dropCacheForRequest(req)
s.updateMutex.Unlock()

return push, nil
}

也就是说,我们每一次处理请求之后,都会将本次请求的内容转化为全局的当前状态。

也就是整个系统有三种状态。

  1. 上一次已经推送的状态
  2. 期望的状态 (<—- 处理中
  3. 期望的最新状态 ( <—- Pending
API

客户端(pliot-agent)和服务端(istiod) 之间的 API,定义在 ads.go 中,也就是 api 规范中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
syntax = "proto3";

package envoy.service.discovery.v3;

import "envoy/service/discovery/v3/discovery.proto";

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";

option java_package = "io.envoyproxy.envoy.service.discovery.v3";
option java_outer_classname = "AdsProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3;discoveryv3";
option java_generic_services = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#protodoc-title: Aggregated Discovery Service (ADS)]

// Discovery services for endpoints, clusters, routes,
// and listeners are retained in the package `envoy.api.v2` for backwards
// compatibility with existing management servers. New development in discovery
// services should proceed in the package `envoy.service.discovery.v2`.

// See https://github.com/envoyproxy/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
// This is a gRPC-only API.
rpc StreamAggregatedResources(stream DiscoveryRequest) returns (stream DiscoveryResponse) {
}

rpc DeltaAggregatedResources(stream DeltaDiscoveryRequest)
returns (stream DeltaDiscoveryResponse) {
}
}

// [#not-implemented-hide:] Not configuration. Workaround c++ protobuf issue with importing
// services: https://github.com/google/protobuf/issues/4221
message AdsDummy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.service.discovery.v2.AdsDummy";
}

也其实只有2个接口

  • StreamAggregatedResources: 一个双向通讯的聚合资源接口
  • DeltaAggregatedResources:支持增量的接口

服务端接收到的请求在 processRequest 进行处理

processRequestgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// processRequest is handling one request. This is currently called from the 'main' thread, which also
// handles 'push' requests and close - the code will eventually call the 'push' code, and it needs more mutex
// protection. Original code avoided the mutexes by doing both 'push' and 'process requests' in same thread.
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
if !s.shouldProcessRequest(con.proxy, req) {
return nil
}

// For now, don't let xDS piggyback debug requests start watchers.
if strings.HasPrefix(req.TypeUrl, v3.DebugType) {
return s.pushXds(con, s.globalPushContext(), &model.WatchedResource{
TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames,
}, &model.PushRequest{Full: true})
}
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
}
shouldRespond := s.shouldRespond(con, req)

var request *model.PushRequest
push := s.globalPushContext()
if shouldRespond {
// This is a request, trigger a full push for this type. Override the blocked push (if it exists),
// as this full push is guaranteed to be a superset of what we would have pushed from the blocked push.
request = &model.PushRequest{Full: true, Push: push}
} else {
// Check if we have a blocked push. If this was an ACK, we will send it.
// Either way we remove the blocked push as we will send a push.
haveBlockedPush := false
con.proxy.Lock()
request, haveBlockedPush = con.blockedPushes[req.TypeUrl]
delete(con.blockedPushes, req.TypeUrl)
con.proxy.Unlock()
if haveBlockedPush {
// we have a blocked push which we will use
log.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID)
} else {
// This is an ACK, no delayed push
// Return immediately, no action needed
return nil
}
}

request.Reason = append(request.Reason, model.ProxyRequest)
request.Start = time.Now()
// SidecarScope for the proxy may not have been updated based on this pushContext.
// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),
// but before proxy's SidecarScope has been updated(s.updateProxy).
if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != push.PushVersion {
s.computeProxyState(con.proxy, request)
}
return s.pushXds(con, push, con.Watched(req.TypeUrl), request)
}

也不用看的太深入就会发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext,
w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}

t0 := time.Now()

res, logdata, err := gen.Generate(con.proxy, push, w, req)

defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()

resp := &discovery.DiscoveryResponse{
ControlPlane: ControlPlane(),
TypeUrl: w.TypeUrl,
// TODO: send different version for incremental eds
VersionInfo: push.PushVersion,
Nonce: nonce(push.LedgerVersion),
Resources: model.ResourcesToAny(res),
}

configSize := ResourceSize(res)
configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))

return nil
}

也就是找到所对应的资源类型的 Generator 来生成资源就可以返回给服务端了。