一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
背景
对于微服务治理中,服务发现
作为通讯的基石,在漫长的发展历史中,包含了 AP: Eureka
CP: Zookeeper
这两大类型,当然还有现在使用的较多的 Consul
,而今天我们需要介绍的是完全的中国芯 Nacos
本文基于 Nacos 2.0.3
架构设计
从 Nascos
的架构设计上,我们可以发现 Nacos
也遵从了 前后端分离
的设计。
模块
服务发现
服务发现提供如下的核心能力
- 服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。
- 服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。
- 服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。 leader raft
- 服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存
- 服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
Core
注册发现看起来就很简单的样子。对于服务注册,本质上只是将服务的原信息(IP,PORT) 写入一个 Map
之中,可以方便后续的查询。因此按照这个思路,我们可以从 OPENAPI
入手。
在 Open API 指南 中,我们可以发现 /nacos/v1/ns/instance
注册的服务,在代码中就很搜索到入口。
registergithub1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build(); getInstanceOperator().registerInstance(namespaceId, serviceName, instance); return "ok"; }
|
在 OPENAPI
中并没有去做些什么,将 Instance
构建出来出来之后,就直接委托给 InstanceOperator
进行处理了,而在这里区分了 GRPC
和 HTTP
两种行径
getInstanceOperatorgithub1 2 3
| private InstanceOperator getInstanceOperator() { return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; }
|
我们去看看 HTTP
-> instanceServiceV1
registerInstancegithub1 2 3 4 5 6 7 8
| @Override public void registerInstance(String namespaceId, String serviceName, Instance instance) { boolean ephemeral = instance.isEphemeral(); String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral); createIpPortClientIfAbsent(clientId); Service service = getService(namespaceId, serviceName, ephemeral); clientOperationService.registerInstance(service, instance, clientId); }
|
而在 clientOperationService
的实现分为两种
ephemeralClientOperationServicegithub1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void registerInstance(Service service, Instance instance, String clientId) { Service singleton = ServiceManager.getInstance().getSingleton(service); Client client = clientManager.getClient(clientId); if (!clientIsLegal(client, clientId)) { return; } InstancePublishInfo instanceInfo = getPublishInfo(instance); client.addServiceInstance(singleton, instanceInfo); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); }
|
或者是
persistentClientOperationServiceImplgithub1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public void registerInstance(Service service, Instance instance, String clientId) { final InstanceStoreRequest request = new InstanceStoreRequest(); request.setService(service); request.setInstance(instance); request.setClientId(clientId); final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group()) .setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name()) .build(); try { protocol.write(writeRequest); } catch (Exception e) { throw new NacosRuntimeException(NacosException.SERVER_ERROR, e); } }
|
这两者的区别就在于 AP
模式或者 CP
模式。
对于以 ephemeral
注册的节点,就会以 AP
模式向外广播。而广播的机制就是 NotifyCenter
而其他的节点,通过 protocol
(jraft) 对外进行同步,也就是以 CP
的模式进行广播
而 CP
通过 Raft
协议对外广播的数据依然需要被消费,而消费之处也在这个 class
中
onApplygithub1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Override
public Response onApply(WriteRequest request) { final InstanceStoreRequest instanceRequest = serializer.deserialize(request.getData().toByteArray()); final DataOperation operation = DataOperation.valueOf(request.getOperation()); final Lock lock = readLock; lock.lock(); try { switch (operation) { case ADD: onInstanceRegister(instanceRequest.service, instanceRequest.instance, instanceRequest.getClientId()); break; case DELETE: onInstanceDeregister(instanceRequest.service, instanceRequest.getClientId()); break; default: return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + operation) .build(); } return Response.newBuilder().setSuccess(true).build(); } finally { lock.unlock(); } }
private void onInstanceRegister(Service service, Instance instance, String clientId) { Service singleton = ServiceManager.getInstance().getSingleton(service); if (!clientManager.contains(clientId)) { clientManager.clientConnected(clientId, new ClientAttributes()); } Client client = clientManager.getClient(clientId); InstancePublishInfo instancePublishInfo = getPublishInfo(instance); client.addServiceInstance(singleton, instancePublishInfo); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); }
|
而在这里也仅仅是作为一个通知机制,其中的某个节点广播给所有的节点,所有的节点会自行的触发一次 publishEvent
但是对于不同的类型节点,都有 ClientRegisterServiceEvent
事件,而 InstanceMetadataEvent
只有 ephemeralClient
才有
参考