合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# 1. 服务注册 > provider--->nacos server 触发机制:事件触发,监听`ServletWebServerInitialized`(Eventservlet容器启动后发出),向nacos注册服务 1.引入服务注册starter后,有如下自动装配,其中包括触发服务注册 ![](https://img.kancloud.cn/79/0e/790ebd85b8694d0ba734dac4168c3f43_1286x310.png) ![](https://img.kancloud.cn/e2/ba/e2ba72d26a41bd951f6426ebf127789b_1288x790.png) ![](https://img.kancloud.cn/ac/ee/aceef38e9d999b1a6a9702b93cfb9f21_1328x923.png) # 2. 发送心跳 ## 2.1服务注册端 在发送服务注册请求的同时,会开启一个定时任务,发送心跳`NacosNamingService#registerInstance` ![](https://img.kancloud.cn/9c/bb/9cbb817190588178f5437109178aba35_1252x425.png) BeatReactor#addBeatInfo ~~~Java public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } ~~~ 发送心跳包 ~~~ class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } //继续发送心跳包 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } ~~~ ## 2.2 nacos server处理心跳请求 1. controller ~~~ @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ... //处理心跳请求 int resultCode = getInstanceOperator() .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder); result.put(CommonParams.CODE, resultCode); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } ~~~ 最终会调用 ClientBeatProcessor#run ~~~ @Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } //更新最后一次心跳时间 instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked() && !instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } ~~~ ## 2.3 nacos检查心跳 ~~~ @Override public void run() { try { ... List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { //如果超出心跳间隔时间,没有发送心跳 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { //将服务实例标记健康状态-->不健康 instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } } ~~~