Isito 虚拟机健康检查

今天我们聊聊 《Isito 虚拟机健康检查》

已知 Istio 对接入的 VM 有健康检查,对于 k8s 上的服务来说,Pod 中就包含了健康检查部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: v1
kind: Pod
metadata:
name: goproxy
labels:
app: goproxy
spec:
containers:
- name: goproxy
image: k8s.gcr.io/goproxy:0.1
ports:
- containerPort: 8080
readinessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 15
periodSeconds: 20

而对于 VM 接入的 Workloadistio 也提供了类似的能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
apiVersion: networking.istio.io/v1alpha3
kind: WorkloadGroup
metadata:
name: reviews
namespace: bookinfo
spec:
metadata:
labels:
app.kubernetes.io/name: reviews
app.kubernetes.io/version: "1.3.4"
template:
ports:
grpc: 3550
http: 8080
serviceAccount: default
probe:
initialDelaySeconds: 5
timeoutSeconds: 3
periodSeconds: 4
successThreshold: 3
failureThreshold: 3
httpGet:
path: /foo/bar
host: 127.0.0.1
port: 3100
scheme: HTTPS
httpHeaders:
- name: Lit-Header
value: Im-The-Best

这个如何工作的呢?我们今天就来探索下这个问题

How it works

PerformApplicationHealthCheck

initXdsProxy 函数中,我们可以看到,istio-agent 在初始化的时候会创建一个 xds.Proxy 对象,这个对象会被用来处理一些工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go proxy.healthChecker.PerformApplicationHealthCheck(func(healthEvent *health.ProbeEvent) {
// Store the same response as Delta and SotW. Depending on how Envoy connects we will use one or the other.
// 这里根据 PerformApplicationHealthCheck 执行的结果,创建不同的 DiscoveryRequest 分别对应 健康状态下 与 非健康状态
var req *discovery.DiscoveryRequest
if healthEvent.Healthy {
req = &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType}
} else {
req = &discovery.DiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: int32(codes.Internal),
Message: healthEvent.UnhealthyMessage,
},
}
}
proxy.PersistRequest(req)
// skip DeltaDiscoveryRequest
}, proxy.stopChan)

对于 PerformApplicationHealthCheck 这里就不做展开,大致上也就是模拟发个请求。

HealthInfoType DiscoveryRequest

那我们发给 PoiltDiscoveryRequest 有何作用呢,我们继续往下探索。

shouldProcessRequest 中我们可以看到如下代码。

1
2
3
4
5
6
7
8
if features.WorkloadEntryHealthChecks {
event := workloadentry.HealthEvent{}
event.Healthy = req.ErrorDetail == nil // 如果 ErrorDetail 不为 null 就是不健康
if !event.Healthy {
event.Message = req.ErrorDetail.Message
}
s.WorkloadEntryController.QueueWorkloadEntryHealth(proxy, event) // 触发到 QueueWorkloadEntryHealth 中
}

而在 QueueWorkloadEntryHealth 中的逻辑也是非常的简单的。

1
2
3
4
5
6
7
func (c *Controller) QueueWorkloadEntryHealth(proxy *model.Proxy, event HealthEvent) {
// replace the updated status
wle := status.UpdateConfigCondition(*cfg, condition.condition)
// update the status
_, err := c.store.UpdateStatus(wle)
return nil
}

到这里,我们对于 workloadEntry 的逻辑已经完全完成。

ServiceRegistry Controller

InstancesByPort 这里我们并不回返回哪些不健康的实例,那这部分是如何实现的呢?

workloadEntryHandler 中,我们发现处理的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *ServiceEntryStore) workloadEntryHandler(old, curr config.Config, event model.Event) {
// If an entry is unhealthy, we will mark this as a delete instead
// This ensures we do not track unhealthy endpoints
if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
event = model.EventDelete
}

// 下面太长了,可以阅读原文,大致上触发了删除内存中的 ServiceInstance,并且触发一个 EdsUpdate

s.serviceInstances.deleteInstances(key, instancesDeleted)
if event == model.EventDelete {
s.workloadInstances.delete(types.NamespacedName{Namespace: curr.Namespace, Name: curr.Name})
s.serviceInstances.deleteInstances(key, instancesUpdated)
} else {
s.workloadInstances.update(wi)
s.serviceInstances.updateInstances(key, instancesUpdated)
}
s.mutex.Unlock()
}

总结

参考