前言
服务注册与发现是Spring Cloud Eureka的核心功能,首先我们需要一个Eureka Server,然后再来一个Eureka Client,那么Client的服务是怎么自动注册到Server的呢?我们都知道SpringBoot是约定大于配置的一个框架,自动配置是在启动的时候扫描/META-INF/spring.factories文件中EnableAutoConfiguration下的所有的*AutoConfiguration类,看一下eureka-client包下的spring.factories文件内容

我们主要关注两个类EurekaClientAutoConfiguration和EurekaDiscoveryClientConfiguration
在类EurekaClientAutoConfiguration的定义上我们可以看到这个类是在EurekaDiscoveryClientConfiguration初始化完成之后再进行初始化的,这不是重点,重点来看EurekaClientAutoConfiguration
1 2 3 4 5 6
   | @AutoConfigureAfter(name = {   "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",   "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",   "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { }
  | 
 
 注册服务
EurekaClientAutoConfiguration类的主要功能是配置EurekaClient。其中有个关键的内部类RefreshableEurekaClientConfiguration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | @Configuration(proxyBeanMethods = false) @ConditionalOnRefreshScope protected static class RefreshableEurekaClientConfiguration {
    @Autowired   private ApplicationContext context;
    @Autowired   private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    @Bean(destroyMethod = "shutdown")   @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)   @org.springframework.cloud.context.config.annotation.RefreshScope   @Lazy   public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {     ApplicationInfoManager appManager;     if (AopUtils.isAopProxy(manager)) {       appManager = ProxyUtils.getTargetObject(manager);     } else {       appManager = manager;     }     CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);     cloudEurekaClient.registerHealthCheck(healthCheckHandler);     return cloudEurekaClient;   }
  }
   | 
 
这个类被@ConditionalOnRefreshScope标注了,因为在spring-cloud-context包的spring.factories中配置了RefreshAutoConfiguration,且@ConditionalOnRefreshScope的实例化取决于RefreshAutoConfiguration
1 2 3 4 5 6 7 8 9
   | @Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @ConditionalOnClass(RefreshScope.class) @ConditionalOnBean(RefreshAutoConfiguration.class) @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true",                        matchIfMissing = true) @interface ConditionalOnRefreshScope { }
  | 
 
既然RefreshableEurekaClientConfiguration类被实例化了,那么里面的EurekaClient也同样被实例化了,在eurekaClient()方法中返回的是CloudEurekaClient类的实例,那么关键就是这个类了。
CloudEurekaClient继承自DiscoveryClient,并且在构造器中是直接调了父类的构造器去处理具体逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
   | DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,                     Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {   try {          scheduler = Executors.newScheduledThreadPool(2,                                                  new ThreadFactoryBuilder()                                                  .setNameFormat("DiscoveryClient-%d")                                                  .setDaemon(true)                                                  .build());
      heartbeatExecutor = new ThreadPoolExecutor(       1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,       new SynchronousQueue<Runnable>(),       new ThreadFactoryBuilder()       .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")       .setDaemon(true)       .build()     );
      cacheRefreshExecutor = new ThreadPoolExecutor(       1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,       new SynchronousQueue<Runnable>(),       new ThreadFactoryBuilder()       .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")       .setDaemon(true)       .build()     );   } catch (Throwable e) {     throw new RuntimeException("Failed to initialize DiscoveryClient!", e);   }
       if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {     fetchRegistryFromBackup();   }
       if (this.preRegistrationHandler != null) {     this.preRegistrationHandler.beforeRegistration();   }
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {     try {       if (!register() ) {         throw new IllegalStateException("Registration error at startup. Invalid server response.");       }     } catch (Throwable th) {       logger.error("Registration error at startup: {}", th.getMessage());       throw new IllegalStateException(th);     }   }
       initScheduledTasks(); }
   | 
 
DiscoveryClient构造器中调用的最核心的两个方法是fetchRegistry()和initScheduledTasks(),fetchRegistry()方法中调用了getAndStoreFullRegistry(),最终在此方法中向Eureka Server发送了获取所有实例的请求
1 2 3 4 5 6 7 8 9 10 11
   | private void getAndStoreFullRegistry() throws Throwable {   long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;   EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null     ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())     : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());   if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {     apps = httpResponse.getEntity();   } }
  | 
 
操作完成之后,调用DiscoveryClient的initScheduledTasks()方法,在这个方法中,注册两个定时任务,一个是以指定的时间间隔获取注册表信息的任务,另一个是在给定的时间间隔内更新租约的heartbeat任务,并且在任务都初始化完成之后调用InstanceInfoReplicator#start方法初始化一个注册远程服务的定时任务。类InstanceInfoReplicator实际上是一个线程,实现自Runnable接口,在他的run方法里调用了DiscoveryClient的register()方法通过REST调用向eureka服务注册。
- 
Eureka接口:
1 2 3 4 5 6 7 8 9 10 11 12
   | POST    /eureka/apps/{appName}                                            注册新的实例  DELETE  /eureka/apps/{appName}/{instanceId}                               注销应用实例  PUT     /eureka/apps/{appName}/{instanceId}                               应用实例发送心跳  GET     /eureka/apps                                                    查询所有的实例  GET     /eureka/apps/{appName}                                            查询指定appId的实例  GET     /eureka/apps/{appName}/{instanceId}                               查询指定appId和instanceId的实例  GET     /eureka/instances/{instanceId}                                  查询指定的instanceId的实例  PUT     /eureka/apps/{appName}/{instanceId}/status?value=OUT_OF_SERVICE   暂停应用实例  PUT     /eureka/apps/{appName}/{instanceId}/status?value=UP               恢复应用实例  PUT     /eureka/apps/{appName}/{instanceId}/metadata?key=value            更新元数据信息  GET     /eureka/vips/{vipAddress}                                       根据vip地址查询  GET     /eureka/svips/{svipAddress}                                     根据svip地址查询
  | 
 
 
这些接口被定义在eureka-core.jar的com.netflix.eureka.resources包中
- 
Eureka核心类
1 2 3 4 5
   | InstanceInfo :              注册的服务实例,里面包含服务实例的各项属性 LeaseInfo :                 Eureka用这个类来标识应用实例的租约信息 ServiceInstance :           发现的实例信息的抽象接口,约定了服务发现的实例应用有哪些通用信息 InstanceStatus :            用于标识服务实例的状态,是一个枚举类,主要有状态UP,DOWN,STARTING,OUT_OF_SERVICE,UNKNOWN EurekaServerConfigBean :    Eureka Server的核心配置类,里面包含了Eureka Server的各项核心属性信息
   | 
 
 
 renew续约心跳
Eureka的续约需要每隔一段时间执行一次,目的是要告诉Eureka Server客户端还活着,以免Eureka Server将其当作是宕机的服务而剔除掉。
Client默认是每隔30秒发送一次renew请求,可以通过配置信息eureka.instance.lease-renewal-interval-in-seconds修改。
Server收到renew请求后,根据接收到的参数找到对应的实例,更新实例的续约时间,再将最新的续约时间同步到集群中的其他Server节点,最终完成续约。

Client端的续约定时任务是在实例化之后在initScheduledTasks()方法中被定义的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | private void initScheduledTasks() {      if (clientConfig.shouldRegisterWithEureka()) {     int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();     int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();     logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
           heartbeatTask = new TimedSupervisorTask(       "heartbeat",       scheduler,       heartbeatExecutor,       renewalIntervalInSecs,       TimeUnit.SECONDS,       expBackOffBound,       new HeartbeatThread()     );     scheduler.schedule(       heartbeatTask,       renewalIntervalInSecs, TimeUnit.SECONDS);   } else {     logger.info("Not registering with Eureka server per configuration");   } }
  | 
 
从代码中可以看到心跳最终使用的是类HeartbeatThread,这个类实际上就是一个线程类,通过ScheduledExecutorService来执行:
1 2 3 4 5 6 7 8
   | private class HeartbeatThread implements Runnable {
    public void run() {     if (renew()) {       lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();     }   } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | boolean renew() {   EurekaHttpResponse<InstanceInfo> httpResponse;   try {          httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);          if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {       REREGISTER_COUNTER.increment();       long timestamp = instanceInfo.setIsDirtyWithTime();       boolean success = register();       if (success) {         instanceInfo.unsetIsDirty(timestamp);       }       return success;     }     return httpResponse.getStatusCode() == Status.OK.getStatusCode();   } catch (Throwable e) {     return false;   } }
  | 
 
Eureka Server根据Jersey框架实现HTTP请求,续约请求最终会被com.netflix.eureka.resources.InstanceResource#renewLease接口接收到,然后通过InstanceRegistry递交给PeerAwareInstanceRegistryImpl,最终递交给AbstractInstanceRegistry#renew处理具体的操作,经过一系列rule操作之后,最终调用Lease#renew完成对lastUpdateTimestamp的更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   | public boolean renew(String appName, String id, boolean isReplication) {   RENEW.increment(isReplication);   Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);   Lease<InstanceInfo> leaseToRenew = null;   if (gMap != null) {     leaseToRenew = gMap.get(id);   }   if (leaseToRenew == null) {     RENEW_NOT_FOUND.increment(isReplication);     return false;   } else {     InstanceInfo instanceInfo = leaseToRenew.getHolder();     if (instanceInfo != null) {                     InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);       if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {         RENEW_NOT_FOUND.increment(isReplication);         return false;       }       if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {         instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);       }     }     renewsLastMin.increment();     leaseToRenew.renew();     return true;   } }
  | 
 
续约操作成功完成后,会调用PeerAwareInstanceRegistryImpl#replicateToPeers方法通知其他Eureka节点
renew控制:
eureka.instance.lease-renewal-interval-in-seconds=10	#10秒renew一次,默认30秒
eureka.instance.lease-expiration-duration-in-senconds=80	#如果80秒内未发送续约请求,则关闭该客户端,默认为90秒
lease-expiration-duration-in-senconds不宜过大,否则可能出现客户端已down,但还是会有流量转发给它;但是也不宜过小,不然客户端可能会因为出现网络抖动而被移除。大于lease-renewal-interval-in-seconds两三倍以上。