本文基于 istio: release-1.7
Pre
对于 istio
架构来说,包含了较多的组件其中包含了 Pilot
Citadel
Galley
等组件,全部置于一个单一的 isitod
镜像中,我们今天就来看看最重要的 Poilot
组件。
Individual Istio components like service discovery (Pilot) , configuration (Galley), certificate generation (Citadel) and extensibility (Mixer)
Pliot
对于 Poliot
来说,我们其实有两个,第一个是 Poilot Discovery
作为控制面组件,另外一个是 Poilot Agent
在数据面对于 Envoy
进行配置的组件。
对于 Pilot
来说,做的事情也很简单
将 istio crd
的配置信息和 kube
的各种服务信息进行整合,然后通过 xds
推送给 Envoy
Discovery Server
函数的入口处为 pilot/pkg/bootstrap/server.go:Start
我们实际上会启动两个不同的 Server
grpcServer
: 负责和 Galley CDS CA 通讯
httpServer
: 处理 readiness 的请求
pilot-discovery
扮演服务注册中心、istio
控制平面到 Envoy
之间的桥梁作用。Pilot-Discovery
的主要功能包括:
监控服务注册中心(如Kubernetes)的服务注册情况。在Kubernetes环境下,会监控service、endpoint、pod、node等资源信息。
监控istio控制面信息变化,在Kubernetes环境下,会监控包括RouteRule、VirtualService、Gateway、EgressRule、ServiceEntry等以Kubernetes CRD形式存在的istio控制面配置信息。
将上述两类信息合并组合为Envoy可以理解的(即遵循Envoy data plane api的)配置信息,并将这些信息以gRPC协议提供给Envoy
Service Registry
Poilot
为了支持多种基础设施,因此抽象了一个对象叫 serviceregistry
,我们从源码中可以发现至少支持了 consul
kube
mcp
mock
memory
这几类基础设施。
在作为常见的 Kube
的支持中,我们在这里 pilot/pkg/serviceregistry/kube/register.go:RegisterEndpoint
注册我们的 Kube
控制节点,我们知道当使用 CRD
的时候,对于 CRD
功能的翻译需要 CRD Controller
进行实现,因此我们的实现点在于 pilot/pkg/serviceregistry/kube/controller/controller.go:NewController
在 NewController
之中,我们注册大量的对于 Kube
资源处理的回调函数
1 2 3 4 5 6 7 8 c.endpoints = newEndpointsController(c, kubeClient.KubeInformer().Core().V1().Endpoints()) registerHandlers(c.nodeInformer, c.queue, "Nodes" , c.onNodeEvent, nil ) registerHandlers(c.pods.informer, c.queue, "Pods" , c.pods.onEvent, nil )
Controller
CRD Resource Controller
在 Kube
的 Service Registry
模式下,支持多种自定义资源来操作我们的 isito
Virtualservice:用于定义路由规则,如根据来源或 Header 制定规则,或在不同服务版本之间分拆流量。
DestinationRule:定义目的服务的配置策略以及可路由子集。策略包括断路器、负载均衡以及 TLS 等。
ServiceEntry:可以使用ServiceEntry向Istio中加入附加的服务条目,以使网格内可以向istio 服务网格之外的服务发出请求。
Gateway:为网格配置网关,以允许一个服务可以被网格外部访问。
EnvoyFilter:可以为Envoy配置过滤器。由于Envoy已经支持Lua过滤器,因此可以通过EnvoyFilter启用Lua过滤器,动态改变Envoy的过滤链行为。我之前一直在考虑如何才能动态扩展Envoy的能力,EnvoyFilter提供了很灵活的扩展性。
Sidecar:缺省情况下,Pilot将会把和Envoy Sidecar所在namespace的所有services的相关配置,包括inbound和outbound listenter, cluster, route等,都下发给Enovy。使用Sidecar可以对Pilot向Envoy Sidcar下发的配置进行更细粒度的调整,例如只向其下发该Sidecar 所在服务需要访问的那些外部服务的相关outbound配置。
这一部分代码都在 pilot/pkg/config/kube
中
Kube Resource Controller
显然我们需要监听 Pods
Services
Namespaces
等信息
这一部分代码都在 pilot/pkg/serviceregistry/kube/controller/
中
Push Context
上述的处理的 callback
大多数都会转至 push_context.go
中进行处理 PushContext
会记录我们操作的更新状态等信息。
Xds Updater
而对于所有的状态更新,都是通过 XDSUpdater
进行更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type Controller struct { client kubernetes.Interface nodeInformer cache.SharedIndexInformer nodeLister listerv1.NodeLister pods *PodCache metrics model.Metrics networksWatcher mesh.NetworksWatcher xdsUpdater model.XDSUpdater domainSuffix string clusterID string }
举个例子,当我们的 Pod
的 IP
进行更新的时候,我们会触发
1 2 3 4 5 func (pc *PodCache) proxyUpdates(ip string ) { if pc.c != nil && pc.c.xdsUpdater != nil { pc.c.xdsUpdater.ProxyUpdate(pc.c.clusterID, ip) } }
从代码中可以发现直接调用了 xdsUpdater.ProxyUpdate
xdsUpdater.ProxyUpdate 1 2 3 4 5 6 7 8 9 10 func (s *DiscoveryServer) ProxyUpdate(clusterID, ip string ) { var connection *Connection s.pushQueue.Enqueue(connection, &model.PushRequest{ Full: true , Push: s.globalPushContext(), Start: time.Now(), Reason: []model.TriggerReason{model.ProxyUpdate}, }) }
我们将这一次的变化 Push
到我们的 PushQueue
中。
而最终的消费者在一层层的调用中我们可以定位至 pilot/pkg/xds/ads.go:pushConnection
中,具体逻辑不做展开。
Multicluster
多集群的设计相对简单,我们只需要将本来的逻辑隔离 +1 配置即可。比如在 AddMemberCluster
我们只需要将 kube client
和 cluster id
传入,我们就可以在第二个集群中创建我们的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (m *Multicluster) AddMemberCluster(clients kubelib.Client, clusterID string ) error { stopCh := make (chan struct {}) var remoteKubeController kubeController remoteKubeController.stopCh = stopCh m.m.Lock() remoteKubeController.Controller = kubectl m.serviceController.AddRegistry(kubectl) m.remoteKubeControllers[clusterID] = &remoteKubeController m.m.Unlock() _ = kubectl.AppendServiceHandler(func (svc *model.Service, ev model.Event) { m.updateHandler(svc) }) clients.RunAndWait(stopCh) return nil }
Poliot 流程分析
上面看了一堆实现,我们还是看下 Bootstrap
的流程,入口处于 pilot/cmd/pilot-discovery/main.go
中
1 discoveryServer, err := bootstrap.NewServer(serverArgs)
Server Def
我们往往从 Server
的定义中就可以看出那些重量级的实现。
pilot/pkg/bootstrap/server.go:Server 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type Server struct { EnvoyXdsServer *xds.DiscoveryServer environment *model.Environment multicluster *kubecontroller.Multicluster configController model.ConfigStoreCache ConfigStores []model.ConfigStoreCache serviceEntryStore *serviceentry.ServiceEntryStore fileWatcher filewatcher.FileWatcher }
保留了一些重点,我们可以看出来那两部分数据来源就是在 configController
和 environment
中
Bootstrap
在 NewServer
中一开始就申明了最重要的对象 model.Environment
NewServer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func NewServer (args *PilotArgs) (*Server, error ) { e := &model.Environment{ ServiceDiscovery: aggregate.NewController(), PushContext: model.NewPushContext(), DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix, } s := &Server{ clusterID: getClusterID(args), environment: e, EnvoyXdsServer: xds.NewDiscoveryServer(e, args.Plugins), fileWatcher: filewatcher.NewWatcher(), httpMux: http.NewServeMux(), readinessProbes: make (map [string ]readinessProbe), } }
model.Environment
里面有三个对象,都很重要:
ServiceDiscovery
: 代表了对于此 Kube
集群的所有 Controller
PushContext
: 包含了当前系统的 Metris
指标的信息,推送给 prom
DomainSuffix
: 此集群的域名后缀
Server
中包含了除此之外的一些辅助能力,比如 fileWatcher
来观察配置文件的变化,EnvoyXdsServer
向 Sidecar
下发数据等
紧挨着比较重要的代码就是
1 2 3 if err := s.initControllers(args); err != nil { return nil , err }
在这里我们初始化所有的对 CRD
的控制对象。
然后就是一大票的对象初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 wh, err := s.initSidecarInjector(args) if err != nil { return nil , fmt.Errorf("error initializing sidecar injector: %v" , err) } if err := s.initConfigValidation(args); err != nil { return nil , fmt.Errorf("error initializing config validator: %v" , err) } if err := s.initIstiodAdminServer(args, wh); err != nil { return nil , fmt.Errorf("error initializing debug server: %v" , err) } if err := s.initRegistryEventHandlers(); err != nil { return nil , fmt.Errorf("error initializing handlers: %v" , err) } if err := s.initDiscoveryService(args); err != nil { return nil , fmt.Errorf("error initializing discovery service: %v" , err) }
Run
在我们初始化完成之后,在 main
函数中,我们开始执行我们真实的逻辑
pilot/pkg/bootstrap/server.go:Start 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (s *Server) Start(stop <-chan struct {}) error { log.Infof("Staring Istiod Server with primary cluster %s" , s.clusterID) for _, fn := range s.startFuncs { if err := fn(stop); err != nil { return err } } go func () { log.Infof("starting gRPC discovery service at %s" , s.GRPCListener.Addr()) if err := s.grpcServer.Serve(s.GRPCListener); err != nil { log.Warna(err) } }() go func () { log.Infof("starting Http service at %s" , s.HTTPListener.Addr()) if err := s.httpServer.Serve(s.HTTPListener); err != nil { log.Warna(err) } }() }
在 s.startFuncs
中储存着我们所有的 start func
这里我们启动我们所有的需要在启动的组件。比如在 XDS
服务的启动函数是定义在如下
pilot/pkg/bootstrap/server.go 1 2 3 4 5 6 7 8 9 func (s *Server) initDiscoveryService(args *PilotArgs) error { log.Infof("starting discovery service" ) s.addStartFunc(func (stop <-chan struct {}) error { s.EnvoyXdsServer.Start(stop) return nil }) }
因此对于我们后需要,我们要查看各种实现都需要通过不同组件的 Start
来辨别。
监控变化
启动的流程不算复杂,我们尝试分析一个常规的场景。
我们监听一个 POD
被删除之后,我们 XDS
应该可以获得监听的变化。
根据 Isito
系统的构成,显然我们可以知道的应该会涉及到 Controller
监听到变化之后的 Action
,而这个 Action
应该就是如何将数据下发到 Sidecar
中。
Controller
在 Init
的抽函数中,我们很容易就发现了 PodController
的位置
pilot/pkg/serviceregistry/kube/controller/controller.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func NewController (kubeClient kubelib.Client, options Options) *Controller { c.serviceInformer = kubeClient.KubeInformer().Core().V1().Services().Informer() c.serviceLister = kubeClient.KubeInformer().Core().V1().Services().Lister() registerHandlers(c.serviceInformer, c.queue, "Services" , c.onServiceEvent, nil ) c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer() c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister() registerHandlers(c.nodeInformer, c.queue, "Nodes" , c.onNodeEvent, nil ) c.pods = newPodCache(c, kubeClient.KubeInformer().Core().V1().Pods(), func (key string ) { item, exists, err := c.endpoints.getInformer().GetStore().GetByKey(key) if err != nil { log.Debugf("Endpoint %v lookup failed with error %v, skipping stale endpoint" , key, err) return } if !exists { log.Debugf("Endpoint %v not found, skipping stale endpoint" , key) return } c.queue.Push(func () error { return c.endpoints.onEvent(item, model.EventUpdate) }) }) registerHandlers(c.pods.informer, c.queue, "Pods" , c.pods.onEvent, nil ) return c }
在构建 newPodCache
时候,显然我们第一次还是需要将所有的 POD
先取出来,然后之后处理我们的回调函数来处理,我们的重心还是来处理 registerHandlers
的事件。
在 registerHandlers
函数中,我们可以看到数据的变化对于系统影响
registerHandlers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func registerHandlers (informer cache.SharedIndexInformer, q queue.Instance, otype string , handler func (interface {}, model.Event) error , filter FilterOutFunc) { informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func (obj interface {}) { }, UpdateFunc: func (old, cur interface {}) { }, DeleteFunc: func (obj interface {}) { incrementEvent(otype, "delete" ) q.Push(func () error { return handler(obj, model.EventDelete) }) }, }) }
在这里我们就将 informer
的数据转化成 istio
的数据结构,然后 push
到 Quene
为了验证我们的想法,我们直接在 incrementEvent(otype, "delete")
加上我们的 BreakPoint
delete a pod 1 kubectl delete pod tcp-echo-v1-7dd5c5dcfb-mxs77
然后果然在我们这里看到 BreakPoint
的对象元数据。
在 Queue
的数据如何被消费,实则是一个定时的 Task
,如下声明
pkg/queue/instance.go:Run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (q *queueImpl) Run(stop <-chan struct {}) { for { q.cond.L.Lock() for !q.closing && len (q.tasks) == 0 { q.cond.Wait() } var task Task task, q.tasks = q.tasks[0 ], q.tasks[1 :] q.cond.L.Unlock() if err := task(); err != nil { log.Infof("Work item handle failed (%v), retry after delay %v" , err, q.delay) time.AfterFunc(q.delay, func () { q.Push(task) }) } } }
而我们真实的 Handler
实则是传递进来的,因此真正的代码是在 PodCache.onEvent
中
pilot/pkg/serviceregistry/kube/controller/pod.go:onEvent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (pc *PodCache) onEvent(curr interface {}, ev model.Event) error { pc.Lock() if len (ip) > 0 { key := kube.KeyFunc(pod.Name, pod.Namespace) switch ev { case model.EventAdd: case model.EventUpdate: case model.EventDelete: if pc.podsByIP[ip] == key { pc.deleteIP(ip) } } for _, handler := range pc.c.workloadHandlers { ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(ip, 0 , "" ) handler(&model.WorkloadInstance{ Name: pod.Name, Namespace: pod.Namespace, Endpoint: ep, PortMap: getPortMap(pod), }, ev) } } return nil }
在 NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(ip, 0, "")
中我们将其翻译成 Endpoint
,而这个 Endpoint
是需要下发到 Sidecar
中的,那处理的逻辑肯定又在 handler
中的。一路小跳到
pilot/pkg/serviceregistry/serviceentry/servicediscovery.go:WorkloadInstanceHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (s *ServiceEntryStore) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) { key := configKey{ kind: workloadInstanceConfigType, name: si.Name, namespace: si.Namespace, } switch event { case model.EventDelete: default : s.workloadInstancesByIP[si.Endpoint.Address] = si s.workloadInstancesIPsByName[k] = si.Endpoint.Address } entries := s.seWithSelectorByNamespace[si.Namespace] instances := []*model.ServiceInstance{} instancesDeleted := []*model.ServiceInstance{} for _, se := range entries { workloadLabels := labels.Collection{si.Endpoint.Labels} instance := convertWorkloadInstanceToServiceInstance(si.Endpoint, se.services, se.entry) instances = append (instances, instance...) if addressToDelete != "" { } } if event != model.EventDelete { s.updateExistingInstances(key, instances) } else { s.deleteExistingInstances(key, instances) } s.edsUpdate(instances) }
其实看到这里,我们发现了我们在 ServiceEntryStore
中维护了系统对于 ServiceEntry
的状态维护。
那如果我们删除了 Service
呢?那显然在 endpointController
找我们想要的对象
pilot/pkg/serviceregistry/kube/controller/endpointcontroller.go:processEndpointEvent 1 2 3 4 5 6 7 8 func processEndpointEvent (c *Controller, epc kubeEndpointsController, name string , namespace string , event model.Event, ep interface {}) error { updateEDS(c, epc, ep, event) if features.EnableHeadlessService { } return nil }
我们在 updateEDS
中去更新 XDS
的相关信息。而最终我们回去调用
1 _ = c.xdsUpdater.EDSUpdate(c.clusterID, string (host), ns, endpoints)
来完成我的 XDS
的更新。
CRD Controller
对于 CRD
资源的定义和操作都置于 pilot/pkg/config/kube/crdclient
中,在 kube client
做了一层缓冲来处理
cacheHandler 1 2 3 4 type cacheHandler struct { client *Client handlers []func (model.Config, model.Config, model.Event) }
在触发了 CRD
变化的时候会回调 onEvent
函数
onEvent 1 2 3 4 5 6 7 func (h *cacheHandler) onEvent(old interface {}, curr interface {}, event model.Event) error { for _, f := range h.handlers { f(oldConfig, currConfig, event) } return nil }
Updater
更新 XDS
也不是一件很简单的事情,我们需要区分,什么时候时候可以增量更新,而什么时候只需要更新一部分。
EDSUpdate
如果仅仅是比如 services
的资源产生了变化,我们就可以直接更新 eds
,比如在上文中,我们删除另一个 services
就会触发 EDSUpdate
函数
EDSUpdate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string , namespace string , istioEndpoints []*model.IstioEndpoint) error { inboundEDSUpdates.Increment() fp := s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints) s.ConfigUpdate(&model.PushRequest{ Full: fp, ConfigsUpdated: map [model.ConfigKey]struct {}{{ Kind: gvk.ServiceEntry, Name: serviceName, Namespace: namespace, }: {}}, Reason: []model.TriggerReason{model.EndpointUpdate}, }) return nil }
然后生成了一个 PushRequst
在 ConfigUpdate
中,我们将请求至于 PushChannel
之中
1 2 3 unc (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { s.pushChannel <- req }
而 Push
的消费逻辑在
DiscoveryServer:Push 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *DiscoveryServer) Push(req *model.PushRequest) { if !req.Full { req.Push = s.globalPushContext() go s.AdsPushAll(versionInfo(), req) return } versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Load(), 10 ) versionNum.Inc() initContextTime := time.Since(t0) adsLog.Debugf("InitContext %v for push took %s" , versionLocal, initContextTime) versionMutex.Lock() version = versionLocal versionMutex.Unlock() req.Push = push go s.AdsPushAll(versionLocal, req) }
More Update
如果我们修改我们的 CRD
定义,就往往会导致更多的配置需要更新,这时候就会触发一些其他的更新。
在 pilot/pkg/xds/
中有大量对于其他 PUSH
的实现,就不做展开。
总结
对于 Pilot
的功能还是非常的简单,就是监听各种 Kube Resource
然后向 XDS
进行数据更新。不过值得注意的,单纯的依靠 Poilot
是完成不了 istio
的功能的, Pilot
更多只是维护了一组 Endpoints
,而访问的一些其他信息都是在 Galley
中进行配置的。
参考