Isito の Pilot 组件

Nhq9u.png

本文基于 istio: release-1.7

Pre

NzdPt.png

对于 istio 架构来说,包含了较多的组件其中包含了 Pilot Citadel Galley 等组件,全部置于一个单一的 isitod 镜像中,我们今天就来看看最重要的 Poilot 组件。

Individual Istio components like service discovery (Pilot), configuration (Galley), certificate generation (Citadel) and extensibility (Mixer)

Pliot

对于 Poliot 来说,我们其实有两个,第一个是 Poilot Discovery 作为控制面组件,另外一个是 Poilot Agent 在数据面对于 Envoy 进行配置的组件。

NhRkz.png


对于 Pilot 来说,做的事情也很简单

istio crd 的配置信息和 kube 的各种服务信息进行整合,然后通过 xds 推送给 Envoy

Discovery Server

函数的入口处为 pilot/pkg/bootstrap/server.go:Start 我们实际上会启动两个不同的 Server

  • grpcServer: 负责和 Galley CDS CA 通讯
  • httpServer: 处理 readiness 的请求

pilot-discovery 扮演服务注册中心、istio 控制平面到 Envoy 之间的桥梁作用。Pilot-Discovery 的主要功能包括:

  • 监控服务注册中心(如Kubernetes)的服务注册情况。在Kubernetes环境下,会监控service、endpoint、pod、node等资源信息。
  • 监控istio控制面信息变化,在Kubernetes环境下,会监控包括RouteRule、VirtualService、Gateway、EgressRule、ServiceEntry等以Kubernetes CRD形式存在的istio控制面配置信息。
  • 将上述两类信息合并组合为Envoy可以理解的(即遵循Envoy data plane api的)配置信息,并将这些信息以gRPC协议提供给Envoy

NhnQG.png

Service Registry

Poilot 为了支持多种基础设施,因此抽象了一个对象叫 serviceregistry,我们从源码中可以发现至少支持了 consul kube mcp mock memory 这几类基础设施。

在作为常见的 Kube 的支持中,我们在这里 pilot/pkg/serviceregistry/kube/register.go:RegisterEndpoint 注册我们的 Kube 控制节点,我们知道当使用 CRD 的时候,对于 CRD 功能的翻译需要 CRD Controller 进行实现,因此我们的实现点在于 pilot/pkg/serviceregistry/kube/controller/controller.go:NewController

NewController 之中,我们注册大量的对于 Kube 资源处理的回调函数

1
2
3
4
5
6
7
8
// 对于 Endpoints 的处理
c.endpoints = newEndpointsController(c, kubeClient.KubeInformer().Core().V1().Endpoints())

// 对于 Nodes 的处理
registerHandlers(c.nodeInformer, c.queue, "Nodes", c.onNodeEvent, nil)

// 对于 Pod 的处理
registerHandlers(c.pods.informer, c.queue, "Pods", c.pods.onEvent, nil)

Controller

CRD Resource Controller

KubeService Registry 模式下,支持多种自定义资源来操作我们的 isito

  • Virtualservice:用于定义路由规则,如根据来源或 Header 制定规则,或在不同服务版本之间分拆流量。
  • DestinationRule:定义目的服务的配置策略以及可路由子集。策略包括断路器、负载均衡以及 TLS 等。
  • ServiceEntry:可以使用ServiceEntry向Istio中加入附加的服务条目,以使网格内可以向istio 服务网格之外的服务发出请求。
  • Gateway:为网格配置网关,以允许一个服务可以被网格外部访问。
  • EnvoyFilter:可以为Envoy配置过滤器。由于Envoy已经支持Lua过滤器,因此可以通过EnvoyFilter启用Lua过滤器,动态改变Envoy的过滤链行为。我之前一直在考虑如何才能动态扩展Envoy的能力,EnvoyFilter提供了很灵活的扩展性。
  • Sidecar:缺省情况下,Pilot将会把和Envoy Sidecar所在namespace的所有services的相关配置,包括inbound和outbound listenter, cluster, route等,都下发给Enovy。使用Sidecar可以对Pilot向Envoy Sidcar下发的配置进行更细粒度的调整,例如只向其下发该Sidecar 所在服务需要访问的那些外部服务的相关outbound配置。

这一部分代码都在 pilot/pkg/config/kube

Kube Resource Controller

显然我们需要监听 Pods Services Namespaces 等信息

这一部分代码都在 pilot/pkg/serviceregistry/kube/controller/

Push Context

上述的处理的 callback 大多数都会转至 push_context.go 中进行处理 PushContext 会记录我们操作的更新状态等信息。

Xds Updater

而对于所有的状态更新,都是通过 XDSUpdater 进行更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Controller struct {
client kubernetes.Interface

nodeInformer cache.SharedIndexInformer
nodeLister listerv1.NodeLister

pods *PodCache

metrics model.Metrics
networksWatcher mesh.NetworksWatcher
// 通过 XDSUpdater 和下游进行通讯
xdsUpdater model.XDSUpdater
domainSuffix string
clusterID string
}

举个例子,当我们的 PodIP 进行更新的时候,我们会触发

1
2
3
4
5
func (pc *PodCache) proxyUpdates(ip string) {
if pc.c != nil && pc.c.xdsUpdater != nil {
pc.c.xdsUpdater.ProxyUpdate(pc.c.clusterID, ip)
}
}

从代码中可以发现直接调用了 xdsUpdater.ProxyUpdate

xdsUpdater.ProxyUpdate
1
2
3
4
5
6
7
8
9
10
func (s *DiscoveryServer) ProxyUpdate(clusterID, ip string) {
var connection *Connection

s.pushQueue.Enqueue(connection, &model.PushRequest{
Full: true,
Push: s.globalPushContext(),
Start: time.Now(),
Reason: []model.TriggerReason{model.ProxyUpdate},
})
}

我们将这一次的变化 Push 到我们的 PushQueue 中。

而最终的消费者在一层层的调用中我们可以定位至 pilot/pkg/xds/ads.go:pushConnection 中,具体逻辑不做展开。

Multicluster

多集群的设计相对简单,我们只需要将本来的逻辑隔离 +1 配置即可。比如在 AddMemberCluster 我们只需要将 kube clientcluster id 传入,我们就可以在第二个集群中创建我们的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (m *Multicluster) AddMemberCluster(clients kubelib.Client, clusterID string) error {
// stopCh to stop controller created here when cluster removed.
stopCh := make(chan struct{})
var remoteKubeController kubeController
remoteKubeController.stopCh = stopCh
m.m.Lock()
remoteKubeController.Controller = kubectl
m.serviceController.AddRegistry(kubectl)

m.remoteKubeControllers[clusterID] = &remoteKubeController
m.m.Unlock()

_ = kubectl.AppendServiceHandler(func(svc *model.Service, ev model.Event) { m.updateHandler(svc) })

clients.RunAndWait(stopCh)
return nil
}

Poliot 流程分析

上面看了一堆实现,我们还是看下 Bootstrap 的流程,入口处于 pilot/cmd/pilot-discovery/main.go

1
discoveryServer, err := bootstrap.NewServer(serverArgs)

Server Def

我们往往从 Server 的定义中就可以看出那些重量级的实现。

pilot/pkg/bootstrap/server.go:Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Server struct {
// 包含的 xds 服务
EnvoyXdsServer *xds.DiscoveryServer
// 包含了 kube 的一些资源的监控
environment *model.Environment
// 多集群的配置
multicluster *kubecontroller.Multicluster
// istio 配置 Controller
configController model.ConfigStoreCache
// istio 配置缓存
ConfigStores []model.ConfigStoreCache
// service entry 配置存储
serviceEntryStore *serviceentry.ServiceEntryStore
// 监听文件的变化
fileWatcher filewatcher.FileWatcher
}

保留了一些重点,我们可以看出来那两部分数据来源就是在 configControllerenvironment

Bootstrap

NewServer 中一开始就申明了最重要的对象 model.Environment

NewServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewServer(args *PilotArgs) (*Server, error) {
e := &model.Environment{
ServiceDiscovery: aggregate.NewController(),
PushContext: model.NewPushContext(),
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}

s := &Server{
clusterID: getClusterID(args),
environment: e,
EnvoyXdsServer: xds.NewDiscoveryServer(e, args.Plugins),
fileWatcher: filewatcher.NewWatcher(),
httpMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
}
}

model.Environment 里面有三个对象,都很重要:

  • ServiceDiscovery: 代表了对于此 Kube 集群的所有 Controller
  • PushContext: 包含了当前系统的 Metris 指标的信息,推送给 prom
  • DomainSuffix: 此集群的域名后缀

Server 中包含了除此之外的一些辅助能力,比如 fileWatcher 来观察配置文件的变化,EnvoyXdsServerSidecar 下发数据等

紧挨着比较重要的代码就是

1
2
3
if err := s.initControllers(args); err != nil {
return nil, err
}

在这里我们初始化所有的对 CRD 的控制对象。

然后就是一大票的对象初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
wh, err := s.initSidecarInjector(args)
if err != nil {
return nil, fmt.Errorf("error initializing sidecar injector: %v", err)
}
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config validator: %v", err)
}
// Used for readiness, monitoring and debug handlers.
if err := s.initIstiodAdminServer(args, wh); err != nil {
return nil, fmt.Errorf("error initializing debug server: %v", err)
}
// This should be called only after controllers are initialized.
if err := s.initRegistryEventHandlers(); err != nil {
return nil, fmt.Errorf("error initializing handlers: %v", err)
}
if err := s.initDiscoveryService(args); err != nil {
return nil, fmt.Errorf("error initializing discovery service: %v", err)
}

Run

在我们初始化完成之后,在 main 函数中,我们开始执行我们真实的逻辑

pilot/pkg/bootstrap/server.go:Start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *Server) Start(stop <-chan struct{}) error {
log.Infof("Staring Istiod Server with primary cluster %s", s.clusterID)

// 启动所有的组件
for _, fn := range s.startFuncs {
if err := fn(stop); err != nil {
return err
}
}

// grpcServer is shared by Galley, CA, XDS - must Serve at the end, but before 'wait'
// 启动 GRPC 服务,这个是被多个组件共享的,CA XDS Galley 等
go func() {
log.Infof("starting gRPC discovery service at %s", s.GRPCListener.Addr())
if err := s.grpcServer.Serve(s.GRPCListener); err != nil {
log.Warna(err)
}
}()

// At this point we are ready - start Http Listener so that it can respond to readiness events.
// 启动 HTTP 服务,处理 readiness
go func() {
log.Infof("starting Http service at %s", s.HTTPListener.Addr())
if err := s.httpServer.Serve(s.HTTPListener); err != nil {
log.Warna(err)
}
}()
}

s.startFuncs 中储存着我们所有的 start func 这里我们启动我们所有的需要在启动的组件。比如在 XDS 服务的启动函数是定义在如下

pilot/pkg/bootstrap/server.go
1
2
3
4
5
6
7
8
9
func (s *Server) initDiscoveryService(args *PilotArgs) error {
log.Infof("starting discovery service")
// Implement EnvoyXdsServer grace shutdown
s.addStartFunc(func(stop <-chan struct{}) error {
s.EnvoyXdsServer.Start(stop)
return nil
})
// ...
}

因此对于我们后需要,我们要查看各种实现都需要通过不同组件的 Start 来辨别。

监控变化

启动的流程不算复杂,我们尝试分析一个常规的场景。

YA6lD.png

我们监听一个 POD 被删除之后,我们 XDS 应该可以获得监听的变化。

根据 Isito 系统的构成,显然我们可以知道的应该会涉及到 Controller 监听到变化之后的 Action,而这个 Action 应该就是如何将数据下发到 Sidecar 中。

Controller

Init 的抽函数中,我们很容易就发现了 PodController 的位置

pilot/pkg/serviceregistry/kube/controller/controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func NewController(kubeClient kubelib.Client, options Options) *Controller {

// ...
c.serviceInformer = kubeClient.KubeInformer().Core().V1().Services().Informer()
c.serviceLister = kubeClient.KubeInformer().Core().V1().Services().Lister()
registerHandlers(c.serviceInformer, c.queue, "Services", c.onServiceEvent, nil)

// ...
c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer()
c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister()
registerHandlers(c.nodeInformer, c.queue, "Nodes", c.onNodeEvent, nil)


c.pods = newPodCache(c, kubeClient.KubeInformer().Core().V1().Pods(), func(key string) {
item, exists, err := c.endpoints.getInformer().GetStore().GetByKey(key)
if err != nil {
log.Debugf("Endpoint %v lookup failed with error %v, skipping stale endpoint", key, err)
return
}
if !exists {
log.Debugf("Endpoint %v not found, skipping stale endpoint", key)
return
}
c.queue.Push(func() error {
return c.endpoints.onEvent(item, model.EventUpdate)
})
})
registerHandlers(c.pods.informer, c.queue, "Pods", c.pods.onEvent, nil)

return c
}

在构建 newPodCache 时候,显然我们第一次还是需要将所有的 POD 先取出来,然后之后处理我们的回调函数来处理,我们的重心还是来处理 registerHandlers 的事件。

registerHandlers 函数中,我们可以看到数据的变化对于系统影响

registerHandlers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func registerHandlers(informer cache.SharedIndexInformer, q queue.Instance, otype string,
handler func(interface{}, model.Event) error, filter FilterOutFunc) {

// 为 informer 增加 Callback
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
//...
},
UpdateFunc: func(old, cur interface{}) {
//...
},
DeleteFunc: func(obj interface{}) {
incrementEvent(otype, "delete") // BreakPoint
q.Push(func() error {
return handler(obj, model.EventDelete)
})
},
})
}

在这里我们就将 informer 的数据转化成 istio 的数据结构,然后 pushQuene

YAfy5.png

为了验证我们的想法,我们直接在 incrementEvent(otype, "delete") 加上我们的 BreakPoint

delete a pod
1
kubectl delete pod tcp-echo-v1-7dd5c5dcfb-mxs77

然后果然在我们这里看到 BreakPoint 的对象元数据。

Queue 的数据如何被消费,实则是一个定时的 Task,如下声明

pkg/queue/instance.go:Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (q *queueImpl) Run(stop <-chan struct{}) {
for {
q.cond.L.Lock() // 上锁
for !q.closing && len(q.tasks) == 0 { // 没有任务继续 Wait
q.cond.Wait()
}


var task Task
task, q.tasks = q.tasks[0], q.tasks[1:] // 获得需要运行的 Task
q.cond.L.Unlock()

if err := task(); err != nil { // 执行操作
log.Infof("Work item handle failed (%v), retry after delay %v", err, q.delay)
time.AfterFunc(q.delay, func() { // 如果错误就延迟执行
q.Push(task)
})
}
}
}

YAayp.png

而我们真实的 Handler 实则是传递进来的,因此真正的代码是在 PodCache.onEvent

pilot/pkg/serviceregistry/kube/controller/pod.go:onEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (pc *PodCache) onEvent(curr interface{}, ev model.Event) error {
pc.Lock()

if len(ip) > 0 {
key := kube.KeyFunc(pod.Name, pod.Namespace)
switch ev {
case model.EventAdd:
//...
case model.EventUpdate:
//...
case model.EventDelete:
// delete only if this pod was in the cache
// 我们根据 IP 删除一个 POD 从缓存中
if pc.podsByIP[ip] == key {
pc.deleteIP(ip)
}
}
// fire instance handles for workload
// 我们修改了 POD,我们显然需要更新 Envoy 的 Endpoint,这里因为我们不维护 Pod 的 IP,还是通过 workloadHandlers 来处理
for _, handler := range pc.c.workloadHandlers {
ep := NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(ip, 0, "")
handler(&model.WorkloadInstance{
Name: pod.Name,
Namespace: pod.Namespace,
Endpoint: ep,
PortMap: getPortMap(pod),
}, ev)
}
}
return nil
}

NewEndpointBuilder(pc.c, pod).buildIstioEndpoint(ip, 0, "") 中我们将其翻译成 Endpoint,而这个 Endpoint 是需要下发到 Sidecar 中的,那处理的逻辑肯定又在 handler 中的。一路小跳到

pilot/pkg/serviceregistry/serviceentry/servicediscovery.go:WorkloadInstanceHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (s *ServiceEntryStore) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {

// 根据 Kind Name Namespace 生成唯一的 Key
key := configKey{
kind: workloadInstanceConfigType,
name: si.Name,
namespace: si.Namespace,
}

switch event {
case model.EventDelete:
// ... 处理 Delete
default: // add or update
// ... 忽略了 Update 的逻辑
// 这里我们将信息储存到 ServiceEntryStore 中
s.workloadInstancesByIP[si.Endpoint.Address] = si
s.workloadInstancesIPsByName[k] = si.Endpoint.Address
}
entries := s.seWithSelectorByNamespace[si.Namespace]
instances := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}

// 这里处理 entries,也就是 ServicesWithEntry,这里需要处理其实是动态的流量均衡
for _, se := range entries {
workloadLabels := labels.Collection{si.Endpoint.Labels}
instance := convertWorkloadInstanceToServiceInstance(si.Endpoint, se.services, se.entry)
instances = append(instances, instance...)
if addressToDelete != "" {
// ... Delete
}
}

if event != model.EventDelete {
s.updateExistingInstances(key, instances)
} else {
s.deleteExistingInstances(key, instances)
}

// eds 更新
s.edsUpdate(instances)
}

其实看到这里,我们发现了我们在 ServiceEntryStore 中维护了系统对于 ServiceEntry 的状态维护。

那如果我们删除了 Service 呢?那显然在 endpointController 找我们想要的对象

pilot/pkg/serviceregistry/kube/controller/endpointcontroller.go:processEndpointEvent
1
2
3
4
5
6
7
8
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep interface{}) error {
// 在这里我们就直接需要更新 XDS 协议中的 EDS
updateEDS(c, epc, ep, event)
if features.EnableHeadlessService {
//....
}
return nil
}

我们在 updateEDS 中去更新 XDS 的相关信息。而最终我们回去调用

1
_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(host), ns, endpoints)

来完成我的 XDS 的更新。

CRD Controller

对于 CRD 资源的定义和操作都置于 pilot/pkg/config/kube/crdclient 中,在 kube client 做了一层缓冲来处理

cacheHandler
1
2
3
4
type cacheHandler struct {
client *Client // CRD Client
handlers []func(model.Config, model.Config, model.Event) // 处理不同配置的 Handler
}

在触发了 CRD 变化的时候会回调 onEvent 函数

onEvent
1
2
3
4
5
6
7
func (h *cacheHandler) onEvent(old interface{}, curr interface{}, event model.Event) error {
//... 略部分代码
for _, f := range h.handlers {
f(oldConfig, currConfig, event)
}
return nil
}

Updater

更新 XDS 也不是一件很简单的事情,我们需要区分,什么时候时候可以增量更新,而什么时候只需要更新一部分。

EDSUpdate

如果仅仅是比如 services 的资源产生了变化,我们就可以直接更新 eds,比如在上文中,我们删除另一个 services 就会触发 EDSUpdate 函数

EDSUpdate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) error {
inboundEDSUpdates.Increment()
// FullPush 来决定是否完全的更新
fp := s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints)
s.ConfigUpdate(&model.PushRequest{
Full: fp,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: serviceName,
Namespace: namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
return nil
}

然后生成了一个 PushRequstConfigUpdate 中,我们将请求至于 PushChannel 之中

1
2
3
unc (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
s.pushChannel <- req
}

Push 的消费逻辑在

DiscoveryServer:Push
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
go s.AdsPushAll(versionInfo(), req)
return
}

// 更新版本号
versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Load(), 10)
versionNum.Inc()
initContextTime := time.Since(t0)
adsLog.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)

versionMutex.Lock()
version = versionLocal
versionMutex.Unlock()


// 推送请求
req.Push = push
go s.AdsPushAll(versionLocal, req)
}

YcDIQ.png

More Update

如果我们修改我们的 CRD 定义,就往往会导致更多的配置需要更新,这时候就会触发一些其他的更新。

pilot/pkg/xds/ 中有大量对于其他 PUSH 的实现,就不做展开。

总结

对于 Pilot 的功能还是非常的简单,就是监听各种 Kube Resource 然后向 XDS 进行数据更新。不过值得注意的,单纯的依靠 Poilot 是完成不了 istio 的功能的, Pilot 更多只是维护了一组 Endpoints,而访问的一些其他信息都是在 Galley 中进行配置的。

参考