众所周知,Istio 有一个很核心的功能是将Kube的资源对象翻译成Envoy的Xds协议。因此对于这部分来说,Istio 涉及到大致上三个部分
Kube 资源的监听
Istio 内部如何存储这些数据
如何转为Xds协议
Kube 资源的监听 Istio 有个单独的路径 pilot/pkg/serviceregistry
用来处理,早期的时候还有不同的资源,现在只保留了 kube 的实现。
在 pilot/pkg/serviceregistry/kube/controller
的路径中,我们可以发现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 . ├── controller.go ├── discoverycontrollers.go ├── endpoint_builder.go ├── endpointcontroller.go ├── endpoints.go ├── endpointslice.go ├── fake.go ├── filter │ ├── discoverynamespaces.go │ └── informer.go ├── leak_test.go ├── multicluster.go ├── namespacecontroller.go ├── network.go ├── pod.go ├── pod_test.go ├── serviceexportcache.go ├── serviceexportcontroller.go ├── sync.go ├── util.go
而我们所有的对象都会存储于 Controller
这个对象中。
在这个对象中,我们可以发现其中存储了最终获取到的所有资源,这个 Controller
对象对应了一个集群,而多个集群我们后文再提及。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 type Controller struct { opts Options client kubelib.Client queue queue.Instance endpoints kubeEndpointsController exports serviceExportCache pods *PodCache serviceHandlers []func (*model.Service, model.Event) workloadHandlers []func (*model.WorkloadInstance, model.Event) servicesMap map [host.Name]*model.Service nodeSelectorsForServices map [host.Name]labels.Instance nodeInfoMap map [string ]kubernetesNode externalNameSvcInstanceMap map [host.Name][]*model.ServiceInstance workloadInstancesByIP map [string ]*model.WorkloadInstance workloadInstancesIPsByName map [string ]string registryServiceNameGateways map [host.Name]uint32 networkGateways map [host.Name]map [network.ID]gatewaySet }
在随之的构造函数中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func NewController (kubeClient kubelib.Client, options Options) *Controller { c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer()) c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer()) c.registerHandlers(c.serviceInformer, "Services" , c.onServiceEvent, nil ) endpointsInformer := filter.NewFilteredSharedIndexInformer( c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Endpoints().Informer(), ) c.endpoints = newEndpointsController(c, endpointsInformer) }
这里我们会发现,对于资源的变化,istio
对于简单的资源,通过 Informer 通过 Event 回调就完成了资源的变更,相对复杂的就是直接单独提了一个 controller
.
小声比比
这里感觉就是不太一致,如果是 Java body 肯定会统一抽象了。
Discoverycontrollers 对于 discoverycontrollers
系统入口是 initDiscoveryNamespaceHandlers
在这里我们会初始化那些需要被关注的 Namespace
,默认情况下是所有,如果需要特殊指定,我们可以在 istio 的 CM 中配置。
当确认这个 namesapce
使我们需要处理的时候会触发如下函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (c *Controller) handleSelectedNamespace(endpointMode EndpointMode, ns string ) { var errs *multierror.Error services, err := c.serviceLister.Services(ns).List(labels.Everything()) for _, svc := range services { errs = multierror.Append(errs, c.onServiceEvent(svc, model.EventAdd)) } pods, err := listerv1.NewPodLister(c.pods.informer.GetIndexer()).Pods(ns).List(labels.Everything()) if err != nil { log.Errorf("error listing pods: %v" , err) return } for _, pod := range pods { errs = multierror.Append(errs, c.pods.onEvent(pod, model.EventAdd)) } }
对于 NS 的变更,我们会立即触发较多的的资源的变化,对 Service
Pod
Endpoint
都会一次性的出发变化。
在此之上,istio
做了一个 Multicluster
的抽象,把单个 Controller
都提取到一起了。这部分我们就不再关注,我们至今知道了,所有的 Kube 原生资源都会在这个 Controller
中被关注到。但是我们还有 VirtualService
这样的配置呢? 而这部分逻辑我们需要回到我们的 Poilt Server
中
ConfigStoreCache Server github 1 2 3 4 5 6 7 type Server struct { XDSServer *xds.DiscoveryServer configController model.ConfigStoreCache ConfigStores []model.ConfigStoreCache serviceEntryStore *serviceentry.ServiceEntryStore }
对于 istio
的配置,都在 ConfigStoreCache
中储存着呢。让我们到初始化的地方。
init github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (s *Server) initConfigSources(args *PilotArgs) (err error ) { for _, configSource := range s.environment.Mesh().ConfigSources { srcAddress, err := url.Parse(configSource.Address) if err != nil { return fmt.Errorf("invalid config URL %s %v" , configSource.Address, err) } scheme := ConfigSourceAddressScheme(srcAddress.Scheme) switch scheme { case Kubernetes: if srcAddress.Path == "" || srcAddress.Path == "/" { err2 := s.initK8SConfigStore(args) if err2 != nil { log.Warn("Error loading k8s " , err2) return err2 } log.Warn("Started K8S config" ) } else { log.Warnf("Not implemented, ignore: %v" , configSource.Address) } default : log.Warnf("Ignoring unsupported config source: %v" , configSource.Address) } } return nil }
这里,我们默认会走到 case Kubernetes
这里其实有个设计上的缺陷,我们这里只能配置一个 Kubernetes
并不能使用多个 Kubernetes
作为数据源。在初始化的最下面,依然有一个聚合函数
aggregateConfigController 1 2 3 4 5 6 aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores) if err != nil { return err } s.configController = aggregateConfigController
显然我们只需要关注 aggregateConfigController
到这里我们已经找到了所有的如何读取数据的地方,那我们的数据又存储在什么地方呢?我们来到了 controller
的定义处
pilot/pkg/config/aggregate/config.go:184 1 2 3 4 5 6 7 func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) { for _, cache := range cr.caches { if _, exists := cache.Schemas().FindByGroupVersionKind(kind); exists { cache.RegisterEventHandler(kind, handler) } } }
我们在就绪的 aggregateConfigController
中,我们为不同的 Schemams
注册不同的回调函数。而这个回调函数的创建位置。
createCacheHandler github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 cl.kinds[s.Resource().GroupVersionKind()] = createCacheHandler(cl, s, i) i.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func (obj interface {}) { incrementEvent(kind, "add" ) if !cl.beginSync.Load() { return } cl.queue.Push(func () error { return h.onEvent(nil , obj, model.EventAdd) }) }, })
我们对于不同的 CRD
对象,我们注册了不同的 Callback
函数,而所有的一切都指向了 onEvent
函数
onEvent github 1 2 3 4 5 6 7 8 9 10 11 12 func (h *cacheHandler) onEvent(old interface {}, curr interface {}, event model.Event) error { currConfig := *TranslateObject(currItem, h.schema.Resource().GroupVersionKind(), h.client.domainSuffix) for _, f := range h.client.handlers[h.schema.Resource().GroupVersionKind()] { f(oldConfig, currConfig, event) } return nil }
那这个 f
是什么呢?笔者找了半天,go
的确这点不太好,最终发现其实很简单
configHandler github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 configHandler := func (_ config.Config, curr config.Config, event model.Event) { pushReq := &model.PushRequest{ Full: true , ConfigsUpdated: map [model.ConfigKey]struct {}{{ Kind: curr.GroupVersionKind, Name: curr.Name, Namespace: curr.Namespace, }: {}}, Reason: []model.TriggerReason{model.ConfigUpdate}, } s.XDSServer.ConfigUpdate(pushReq) if event != model.EventDelete { s.statusReporter.AddInProgressResource(curr) } else { s.statusReporter.DeleteInProgressResource(curr) } }
小声比比
所有的 Istio CRD 的变更,都会导致 Xds 发生 FullPush,尽量的减少 CRD 的变更。
其实看到这里,我们发现,至今我们都没有发现这些资源,istio 是如何储存的,而在这里,已经慢慢的揭开面纱。
ConfigUpdate github 1 2 3 4 5 func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { inboundConfigUpdates.Increment() s.InboundUpdates.Inc() s.pushChannel <- req }
我们大部分的行为变化,都会导致发生 ConfigUpdate
,而我们是通过一个独立的 pushChannel
向我们的 xds server
发送信息(也是poliot自身)。现在再回到我们的 endpointController
中,我们可以发现
updateEDS github 1 2 3 4 5 6 7 8 9 10 11 func updateEDS (c *Controller, epc kubeEndpointsController, ep interface {}, event model.Event) { host, svcName, ns := epc.getServiceInfo(ep) log.Debugf("Handle EDS endpoint %s in namespace %s" , svcName, ns) var endpoints []*model.IstioEndpoint if event == model.EventDelete { endpoints = epc.forgetEndpoint(ep) } else { endpoints = epc.buildIstioEndpoints(ep, host) } c.opts.XDSUpdater.EDSUpdate(string (c.Cluster()), string (host), ns, endpoints) }
呼之欲出,我们所有的资源变化都会导致 Xds Server
产生变化。
因此我们来到了下一个问题
Istio 如何储存资源 在消费 push event
的地方,我们经常轻易的发现,这一切处理函数在
push github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (s *DiscoveryServer) Push(req *model.PushRequest) { if !req.Full { req.Push = s.globalPushContext() s.dropCacheForRequest(req) s.AdsPushAll(versionInfo(), req) return } oldPushContext := s.globalPushContext() if oldPushContext != nil { oldPushContext.OnConfigChange() } t0 := time.Now() versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10 ) push, err := s.initPushContext(req, oldPushContext, versionLocal) if err != nil { return } initContextTime := time.Since(t0) log.Debugf("InitContext %v for push took %s" , versionLocal, initContextTime) pushContextInitTime.Record(initContextTime.Seconds()) versionMutex.Lock() version = versionLocal versionMutex.Unlock() req.Push = push s.AdsPushAll(versionLocal, req) }
我们可以轻易的猜测出,推送的数据显然是是这个 initPushContext 函数所提供。再需要细微的一小步,我们就可以发现构建数据
createNewContext 1 2 3 4 5 6 7 8 9 10 11 12 13 func (ps *PushContext) createNewContext(env *Environment) error { if err := ps.initServiceRegistry(env); err != nil { return err } if err := ps.initKubernetesGateways(env); err != nil { return err } if err := ps.initVirtualServices(env); err != nil { return err } return nil }
此时我们从 PushContext
定义可见。
PushContext github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type PushContext struct { proxyStatusMutex sync.RWMutex ProxyStatus map [string ]map [string ]ProxyPushStatus exportToDefaults exportToDefaults ServiceIndex serviceIndex virtualServiceIndex virtualServiceIndex destinationRuleIndex destinationRuleIndex gatewayIndex gatewayIndex }
从这里就可以发现,其实我们现在所有的对象都转化为了 istio
内置在 pushContext
中的数据结构。
//TODO 转化逻辑
重要对象关系 在我们探索最后一部分逻辑的时候,我们了解下有几个重要重要对象的关系
DiscoveryServer: 我们的 xDS 服务器,也就是 Poliot Discovery 组件
Proxy: 一个 xDS 连接
PushRequest: 推送请求
PushContext: 推送上下文对象,对于 DiscoveryServer 存储一个 全局的 globalPushContext
因此在这里我们我们就可以梳理清楚这几者的关系。
如何转为Xds协议 来到了我们的最后一个需要解答的地方,我们这些 PushContext
的内容又是如何转化为 xDS
协议的呢?
对于所有的资源转化,istio
抽象了一个独立的 Interface
进行出来。
XdsResourceGenerator github 1 2 3 type XdsResourceGenerator interface { Generate(proxy *Proxy, push *PushContext, w *WatchedResource, updates *PushRequest) (Resources, XdsLogDetails, error ) }
我们可以发现不同类型的转换。对于 xDS
协议本身的实现,可以参考 xDS 协议的实现
从这里我们梳理下调用的时序关系
当我们有一个全局概念之后,我们就可以来看看具体的 LDS
是如何实现的。
LDS 我们的开胃菜先从 LDS
开始看起来。
Generate github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (l LdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) (model.Resources, model.XdsLogDetails, error ) { if !ldsNeedsPush(req) { return nil , model.DefaultXdsLogDetails, nil } listeners := l.Server.ConfigGenerator.BuildListeners(proxy, push) resources := model.Resources{} for _, c := range listeners { resources = append (resources, &discovery.Resource{ Name: c.Name, Resource: util.MessageToAny(c), }) } return resources, model.DefaultXdsLogDetails, nil }
对于 LDS
的生成逻辑,会分为 Sidecar
和 Router
两部分,我们取 Sidecar
处逻辑,在
buildSidecarInboundListeners github 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (configgen *ConfigGeneratorImpl) buildSidecarInboundListeners(node *model.Proxy,push *model.PushContext) []*listener.Listener { var listeners []*listener.Listener listenerMap := make (map [int ]*inboundListenerEntry) sidecarScope := node.SidecarScope noneMode := node.GetInterceptionMode() == model.InterceptionNone if !sidecarScope.HasIngressListener() { for _, instance := range node.ServiceInstances { endpoint := instance.Endpoint wildcard, _ := getActualWildcardAndLocalHost(node) bind := wildcard port := *instance.ServicePort port.Port = int (endpoint.EndpointPort) listenerOpts := buildListenerOpts{ push: push, proxy: node, bind: bind, port: &port, bindToPort: false , protocol: istionetworking.ModelProtocolToListenerProtocol(instance.ServicePort.Protocol, core.TrafficDirection_INBOUND), } pluginParams := &plugin.InputParams{ Node: node, ServiceInstance: instance, Push: push, } if l := configgen.buildSidecarInboundListenerForPortOrUDS(listenerOpts, pluginParams, listenerMap); l != nil { listeners = append (listeners, l) } } return listeners } return listeners }
而这个 Instances
列表是从我们 Poliot Xds
上下文中构建而来。
SetServiceInstances github 1 2 3 4 5 6 7 8 9 10 11 func (node *Proxy) SetServiceInstances(serviceDiscovery ServiceDiscovery) { instances := serviceDiscovery.GetProxyServiceInstances(node) sort.SliceStable(instances, func (i, j int ) bool { }) node.ServiceInstances = instances }
EDS 除了 LDS
之外,还有一些会走捷径的资源更新,比如我们只是 Pod Ip
的变更不会导致其他资源更新,我们只需要更新我们的 eds
就可以。
processEndpointEvent github 1 2 3 4 func processEndpointEvent (c *Controller, epc kubeEndpointsController, name string , namespace string , event model.Event, ep interface {}) error { updateEDS(c, epc, ep, event) return nil }
这样我们可以更快速抵达更新路径上
EDSUpdate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string , namespace string , istioEndpoints []*model.IstioEndpoint) { inboundEDSUpdates.Increment() fp := s.edsCacheUpdate(clusterID, serviceName, namespace, istioEndpoints) s.ConfigUpdate(&model.PushRequest{ Full: fp, ConfigsUpdated: map [model.ConfigKey]struct {}{{ Kind: gvk.ServiceEntry, Name: serviceName, Namespace: namespace, }: {}}, Reason: []model.TriggerReason{model.EndpointUpdate}, }) }
但是值得注意,因为一些其他原因的也会导致,我们仅仅更新 eds
是不够的也需要全量的更新。
小结 对于当前版本的 istio
我们还需要一些额外的工作,比如自动注册的 workload entry
,我们没有在本文中解释,寄希望大家能够把握住,系统的主要脉络以分析系统的构成。