How Istio works: translate

众所周知,Istio 有一个很核心的功能是将Kube的资源对象翻译成Envoy的Xds协议。因此对于这部分来说,Istio 涉及到大致上三个部分

  • Kube 资源的监听
  • Istio 内部如何存储这些数据
  • 如何转为Xds协议

Kube 资源的监听

Istio 有个单独的路径 pilot/pkg/serviceregistry 用来处理,早期的时候还有不同的资源,现在只保留了 kube 的实现。

pilot/pkg/serviceregistry/kube/controller 的路径中,我们可以发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
.
├── controller.go
├── discoverycontrollers.go
├── endpoint_builder.go
├── endpointcontroller.go
├── endpoints.go
├── endpointslice.go
├── fake.go
├── filter
│ ├── discoverynamespaces.go
│ └── informer.go
├── leak_test.go
├── multicluster.go
├── namespacecontroller.go
├── network.go
├── pod.go
├── pod_test.go
├── serviceexportcache.go
├── serviceexportcontroller.go
├── sync.go
├── util.go

而我们所有的对象都会存储于 Controller 这个对象中。

在这个对象中,我们可以发现其中存储了最终获取到的所有资源,这个 Controller 对象对应了一个集群,而多个集群我们后文再提及。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Controller struct {
opts Options

client kubelib.Client

queue queue.Instance

// Endpoint 的 Controller
endpoints kubeEndpointsController

// serviceExport Cache & Pod Cache
exports serviceExportCache
pods *PodCache

serviceHandlers []func(*model.Service, model.Event)
workloadHandlers []func(*model.WorkloadInstance, model.Event)

// 这下面都是一些内部的数据结构,比如外部服务,Workload 等等
// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
servicesMap map[host.Name]*model.Service
// nodeSelectorsForServices stores hostname => label selectors that can be used to
// refine the set of node port IPs for a service.
nodeSelectorsForServices map[host.Name]labels.Instance
// map of node name and its address+labels - this is the only thing we need from nodes
// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
// we run through the label selectors here to pick only ones that we need.
// Only nodes with ExternalIP addresses are included in this map !
nodeInfoMap map[string]kubernetesNode
// externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services
externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance
// workload instances from workload entries - map of ip -> workload instance
workloadInstancesByIP map[string]*model.WorkloadInstance
// Stores a map of workload instance name/namespace to address
workloadInstancesIPsByName map[string]string
// tracks which services on which ports should act as a gateway for networkForRegistry
registryServiceNameGateways map[host.Name]uint32
// gateways for each network, indexed by the service that runs them so we clean them up later
networkGateways map[host.Name]map[network.ID]gatewaySet
}

在随之的构造函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewController(kubeClient kubelib.Client, options Options) *Controller {
c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())

// 通过 Informer 的通知来处理 Service 对象
c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)

// 通过 Informer 的通知来处理 Endpoints 对象
endpointsInformer := filter.NewFilteredSharedIndexInformer(
c.opts.DiscoveryNamespacesFilter.Filter,
kubeClient.KubeInformer().Core().V1().Endpoints().Informer(),
)
c.endpoints = newEndpointsController(c, endpointsInformer)
// 下面都是类似的略
}

这里我们会发现,对于资源的变化,istio 对于简单的资源,通过 Informer 通过 Event 回调就完成了资源的变更,相对复杂的就是直接单独提了一个 controller.

这里感觉就是不太一致,如果是 Java body 肯定会统一抽象了。

Discoverycontrollers

对于 discoverycontrollers 系统入口是 initDiscoveryNamespaceHandlers 在这里我们会初始化那些需要被关注的 Namespace,默认情况下是所有,如果需要特殊指定,我们可以在 istio 的 CM 中配置。

当确认这个 namesapce 使我们需要处理的时候会触发如下函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *Controller) handleSelectedNamespace(endpointMode EndpointMode, ns string) {
var errs *multierror.Error

// 获得所有的服务
services, err := c.serviceLister.Services(ns).List(labels.Everything())
// 触发服务增加的事件回调
for _, svc := range services {
errs = multierror.Append(errs, c.onServiceEvent(svc, model.EventAdd))
}
// 获得所有的 Pod
pods, err := listerv1.NewPodLister(c.pods.informer.GetIndexer()).Pods(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing pods: %v", err)
return
}

// 触发Pod增加的事件回调
for _, pod := range pods {
errs = multierror.Append(errs, c.pods.onEvent(pod, model.EventAdd))
}

// 下同忽略
}

对于 NS 的变更,我们会立即触发较多的的资源的变化,对 Service Pod Endpoint 都会一次性的出发变化。

其他的都类似不做一一解释。

在此之上,istio 做了一个 Multicluster 的抽象,把单个 Controller 都提取到一起了。这部分我们就不再关注,我们至今知道了,所有的 Kube 原生资源都会在这个 Controller 中被关注到。但是我们还有 VirtualService 这样的配置呢? 而这部分逻辑我们需要回到我们的 Poilt Server

ConfigStoreCache

Servergithub
1
2
3
4
5
6
7
type Server struct {
XDSServer *xds.DiscoveryServer

configController model.ConfigStoreCache
ConfigStores []model.ConfigStoreCache
serviceEntryStore *serviceentry.ServiceEntryStore
}

对于 istio 的配置,都在 ConfigStoreCache 中储存着呢。让我们到初始化的地方。

initgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *Server) initConfigSources(args *PilotArgs) (err error) {
for _, configSource := range s.environment.Mesh().ConfigSources {
srcAddress, err := url.Parse(configSource.Address)
if err != nil {
return fmt.Errorf("invalid config URL %s %v", configSource.Address, err)
}
scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
switch scheme {
// 略
case Kubernetes:
if srcAddress.Path == "" || srcAddress.Path == "/" {
err2 := s.initK8SConfigStore(args)
if err2 != nil {
log.Warn("Error loading k8s ", err2)
return err2
}
log.Warn("Started K8S config")
} else {
log.Warnf("Not implemented, ignore: %v", configSource.Address)
// TODO: handle k8s:// scheme for remote cluster. Use same mechanism as service registry,
// using the cluster name as key to match a secret.
}
default:
log.Warnf("Ignoring unsupported config source: %v", configSource.Address)
}
}
return nil
}

这里,我们默认会走到 case Kubernetes 这里其实有个设计上的缺陷,我们这里只能配置一个 Kubernetes 并不能使用多个 Kubernetes 作为数据源。在初始化的最下面,依然有一个聚合函数

aggregateConfigController
1
2
3
4
5
6
// Wrap the config controller with a cache.
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
if err != nil {
return err
}
s.configController = aggregateConfigController

显然我们只需要关注 aggregateConfigController

到这里我们已经找到了所有的如何读取数据的地方,那我们的数据又存储在什么地方呢?我们来到了 controller 的定义处

pilot/pkg/config/aggregate/config.go:184
1
2
3
4
5
6
7
func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
for _, cache := range cr.caches {
if _, exists := cache.Schemas().FindByGroupVersionKind(kind); exists {
cache.RegisterEventHandler(kind, handler)
}
}
}

我们在就绪的 aggregateConfigController 中,我们为不同的 Schemams 注册不同的回调函数。而这个回调函数的创建位置。

createCacheHandlergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
cl.kinds[s.Resource().GroupVersionKind()] = createCacheHandler(cl, s, i)

i.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
incrementEvent(kind, "add")
if !cl.beginSync.Load() {
return
}
cl.queue.Push(func() error {
return h.onEvent(nil, obj, model.EventAdd)
})
},
// 略
})

我们对于不同的 CRD 对象,我们注册了不同的 Callback 函数,而所有的一切都指向了 onEvent 函数

onEventgithub
1
2
3
4
5
6
7
8
9
10
11
12
func (h *cacheHandler) onEvent(old interface{}, curr interface{}, event model.Event) error {

// 这里将所有的CRD对象转为istio内置的 Config 对象
currConfig := *TranslateObject(currItem, h.schema.Resource().GroupVersionKind(), h.client.domainSuffix)

// TODO we may consider passing a pointer to handlers instead of the value. While spec is a pointer, the meta will be copied
// 而我们在这里真的去处理
for _, f := range h.client.handlers[h.schema.Resource().GroupVersionKind()] {
f(oldConfig, currConfig, event)
}
return nil
}

那这个 f 是什么呢?笔者找了半天,go 的确这点不太好,最终发现其实很简单

configHandlergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
configHandler := func(_ config.Config, curr config.Config, event model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: curr.GroupVersionKind,
Name: curr.Name,
Namespace: curr.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ConfigUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
if event != model.EventDelete {
s.statusReporter.AddInProgressResource(curr)
} else {
s.statusReporter.DeleteInProgressResource(curr)
}
}

所有的 Istio CRD 的变更,都会导致 Xds 发生 FullPush,尽量的减少 CRD 的变更。

其实看到这里,我们发现,至今我们都没有发现这些资源,istio 是如何储存的,而在这里,已经慢慢的揭开面纱。

ConfigUpdategithub
1
2
3
4
5
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
inboundConfigUpdates.Increment()
s.InboundUpdates.Inc()
s.pushChannel <- req
}

我们大部分的行为变化,都会导致发生 ConfigUpdate,而我们是通过一个独立的 pushChannel 向我们的 xds server 发送信息(也是poliot自身)。现在再回到我们的 endpointController 中,我们可以发现

updateEDSgithub
1
2
3
4
5
6
7
8
9
10
11
func updateEDS(c *Controller, epc kubeEndpointsController, ep interface{}, event model.Event) {
host, svcName, ns := epc.getServiceInfo(ep)
log.Debugf("Handle EDS endpoint %s in namespace %s", svcName, ns)
var endpoints []*model.IstioEndpoint
if event == model.EventDelete {
endpoints = epc.forgetEndpoint(ep)
} else {
endpoints = epc.buildIstioEndpoints(ep, host)
}
c.opts.XDSUpdater.EDSUpdate(string(c.Cluster()), string(host), ns, endpoints)
}

呼之欲出,我们所有的资源变化都会导致 Xds Server 产生变化。

因此我们来到了下一个问题

Istio 如何储存资源

在消费 push event 的地方,我们经常轻易的发现,这一切处理函数在

pushgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
s.dropCacheForRequest(req)
s.AdsPushAll(versionInfo(), req)
return
}

oldPushContext := s.globalPushContext()
if oldPushContext != nil {
oldPushContext.OnConfigChange()
}
t0 := time.Now()

versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10)
push, err := s.initPushContext(req, oldPushContext, versionLocal)
if err != nil {
return
}
initContextTime := time.Since(t0)
log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)
pushContextInitTime.Record(initContextTime.Seconds())

versionMutex.Lock()
version = versionLocal
versionMutex.Unlock()

req.Push = push
s.AdsPushAll(versionLocal, req)
}

我们可以轻易的猜测出,推送的数据显然是是这个 initPushContext 函数所提供。再需要细微的一小步,我们就可以发现构建数据

createNewContext
1
2
3
4
5
6
7
8
9
10
11
12
13
func (ps *PushContext) createNewContext(env *Environment) error {
if err := ps.initServiceRegistry(env); err != nil {
return err
}
if err := ps.initKubernetesGateways(env); err != nil {
return err
}
if err := ps.initVirtualServices(env); err != nil {
return err
}
// 下略
return nil
}

此时我们从 PushContext 定义可见。

PushContextgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type PushContext struct {
proxyStatusMutex sync.RWMutex
// ProxyStatus is keyed by the error code, and holds a map keyed
// by the ID.
ProxyStatus map[string]map[string]ProxyPushStatus
exportToDefaults exportToDefaults
// ServiceIndex is the index of services by various fields.
ServiceIndex serviceIndex
// vs 配置
virtualServiceIndex virtualServiceIndex
// dt 配置
destinationRuleIndex destinationRuleIndex
// gateway配置
gatewayIndex gatewayIndex

// 下略
}

从这里就可以发现,其实我们现在所有的对象都转化为了 istio 内置在 pushContext 中的数据结构。

//TODO 转化逻辑

重要对象关系

在我们探索最后一部分逻辑的时候,我们了解下有几个重要重要对象的关系

  • DiscoveryServer: 我们的 xDS 服务器,也就是 Poliot Discovery 组件
  • Proxy: 一个 xDS 连接
  • PushRequest: 推送请求
  • PushContext: 推送上下文对象,对于 DiscoveryServer 存储一个 全局的 globalPushContext

因此在这里我们我们就可以梳理清楚这几者的关系。

如何转为Xds协议

来到了我们的最后一个需要解答的地方,我们这些 PushContext 的内容又是如何转化为 xDS 协议的呢?

对于所有的资源转化,istio 抽象了一个独立的 Interface 进行出来。

XdsResourceGeneratorgithub
1
2
3
type XdsResourceGenerator interface {
Generate(proxy *Proxy, push *PushContext, w *WatchedResource, updates *PushRequest) (Resources, XdsLogDetails, error)
}

我们可以发现不同类型的转换。对于 xDS 协议本身的实现,可以参考 xDS 协议的实现

从这里我们梳理下调用的时序关系

当我们有一个全局概念之后,我们就可以来看看具体的 LDS 是如何实现的。

LDS

我们的开胃菜先从 LDS 开始看起来。

Generategithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (l LdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
if !ldsNeedsPush(req) {
return nil, model.DefaultXdsLogDetails, nil
}
listeners := l.Server.ConfigGenerator.BuildListeners(proxy, push)
resources := model.Resources{}
for _, c := range listeners {
resources = append(resources, &discovery.Resource{
Name: c.Name,
Resource: util.MessageToAny(c),
})
}
return resources, model.DefaultXdsLogDetails, nil
}

对于 LDS 的生成逻辑,会分为 SidecarRouter 两部分,我们取 Sidecar 处逻辑,在

buildSidecarInboundListenersgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (configgen *ConfigGeneratorImpl) buildSidecarInboundListeners(node *model.Proxy,push *model.PushContext) []*listener.Listener {
var listeners []*listener.Listener
listenerMap := make(map[int]*inboundListenerEntry)

sidecarScope := node.SidecarScope
noneMode := node.GetInterceptionMode() == model.InterceptionNone
if !sidecarScope.HasIngressListener() {
// 先构建所有的 Service 实例对应的地址 Listener,而这个 instances,从而来,我们下文介绍
for _, instance := range node.ServiceInstances {
endpoint := instance.Endpoint
wildcard, _ := getActualWildcardAndLocalHost(node)
bind := wildcard
port := *instance.ServicePort
port.Port = int(endpoint.EndpointPort)
listenerOpts := buildListenerOpts{
push: push,
proxy: node,
bind: bind,
port: &port,
bindToPort: false,
protocol: istionetworking.ModelProtocolToListenerProtocol(instance.ServicePort.Protocol, core.TrafficDirection_INBOUND),
}

pluginParams := &plugin.InputParams{
Node: node,
ServiceInstance: instance,
Push: push,
}

if l := configgen.buildSidecarInboundListenerForPortOrUDS(listenerOpts, pluginParams, listenerMap); l != nil {
listeners = append(listeners, l)
}
}
return listeners
}

return listeners
}

而这个 Instances 列表是从我们 Poliot Xds 上下文中构建而来。

SetServiceInstancesgithub
1
2
3
4
5
6
7
8
9
10
11
func (node *Proxy) SetServiceInstances(serviceDiscovery ServiceDiscovery) {
// 我们根据当前 xDs Context 上下文中的 serviceDiscovery 构建这个 Instance 列表
instances := serviceDiscovery.GetProxyServiceInstances(node)

// Keep service instances in order of creation/hostname.
sort.SliceStable(instances, func(i, j int) bool {
// ....
})

node.ServiceInstances = instances
}

EDS

除了 LDS 之外,还有一些会走捷径的资源更新,比如我们只是 Pod Ip 的变更不会导致其他资源更新,我们只需要更新我们的 eds 就可以。

processEndpointEventgithub
1
2
3
4
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep interface{}) error {
updateEDS(c, epc, ep, event)
return nil
}

这样我们可以更快速抵达更新路径上

EDSUpdate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) {
inboundEDSUpdates.Increment()
// Update the endpoint shards
fp := s.edsCacheUpdate(clusterID, serviceName, namespace, istioEndpoints)
// Trigger a push
s.ConfigUpdate(&model.PushRequest{
Full: fp,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: serviceName,
Namespace: namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
}

但是值得注意,因为一些其他原因的也会导致,我们仅仅更新 eds 是不够的也需要全量的更新。

小结

对于当前版本的 istio 我们还需要一些额外的工作,比如自动注册的 workload entry,我们没有在本文中解释,寄希望大家能够把握住,系统的主要脉络以分析系统的构成。