Around Nacos: 服务发现

一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

背景

对于微服务治理中,服务发现 作为通讯的基石,在漫长的发展历史中,包含了 AP: Eureka CP: Zookeeper 这两大类型,当然还有现在使用的较多的 Consul,而今天我们需要介绍的是完全的中国芯 Nacos

本文基于 Nacos 2.0.3

架构设计

Nascos 的架构设计上,我们可以发现 Nacos 也遵从了 前后端分离 的设计。

模块

服务发现

服务发现提供如下的核心能力

  • 服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。
  • 服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。
  • 服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。 leader raft
  • 服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存
  • 服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)

Core

注册发现看起来就很简单的样子。对于服务注册,本质上只是将服务的原信息(IP,PORT) 写入一个 Map 之中,可以方便后续的查询。因此按照这个思路,我们可以从 OPENAPI 入手。

Open API 指南 中,我们可以发现 /nacos/v1/ns/instance 注册的服务,在代码中就很搜索到入口。

registergithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {

final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}

OPENAPI 中并没有去做些什么,将 Instance 构建出来出来之后,就直接委托给 InstanceOperator 进行处理了,而在这里区分了 GRPCHTTP 两种行径

getInstanceOperatorgithub
1
2
3
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

我们去看看 HTTP -> instanceServiceV1

registerInstancegithub
1
2
3
4
5
6
7
8
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}

而在 clientOperationService 的实现分为两种

ephemeralClientOperationServicegithub
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

或者是

persistentClientOperationServiceImplgithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
final InstanceStoreRequest request = new InstanceStoreRequest();
request.setService(service);
request.setInstance(instance);
request.setClientId(clientId);
final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name())
.build();

try {
protocol.write(writeRequest);
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}

这两者的区别就在于 AP 模式或者 CP 模式。

对于以 ephemeral 注册的节点,就会以 AP 模式向外广播。而广播的机制就是 NotifyCenter
而其他的节点,通过 protocol (jraft) 对外进行同步,也就是以 CP 的模式进行广播

CP 通过 Raft 协议对外广播的数据依然需要被消费,而消费之处也在这个 class

onApplygithub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
// 监听了 Raft append log
public Response onApply(WriteRequest request) {
final InstanceStoreRequest instanceRequest = serializer.deserialize(request.getData().toByteArray());
final DataOperation operation = DataOperation.valueOf(request.getOperation());
final Lock lock = readLock;
lock.lock();
try {
switch (operation) {
case ADD:
onInstanceRegister(instanceRequest.service, instanceRequest.instance,
instanceRequest.getClientId());
break;
case DELETE:
onInstanceDeregister(instanceRequest.service, instanceRequest.getClientId());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + operation)
.build();
}
return Response.newBuilder().setSuccess(true).build();
} finally {
lock.unlock();
}
}

// 最终的写入之处
private void onInstanceRegister(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!clientManager.contains(clientId)) {
clientManager.clientConnected(clientId, new ClientAttributes());
}
Client client = clientManager.getClient(clientId);
InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instancePublishInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}

而在这里也仅仅是作为一个通知机制,其中的某个节点广播给所有的节点,所有的节点会自行的触发一次 publishEvent 但是对于不同的类型节点,都有 ClientRegisterServiceEvent 事件,而 InstanceMetadataEvent 只有 ephemeralClient 才有

参考