Apollo 客户端启动过程分析

入口是可以在 springboot 的启动类上打上EnableApolloConfig注解

1
2
@Import(ApolloConfigRegistrar.class)
public @interface EnableApolloConfig {

这个 import 实现了

1
2
3
4
5
6
7
8
9
public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar {

private ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class);

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
helper.registerBeanDefinitions(importingClassMetadata, registry);
}
}

然后就调用了

1
com.ctrip.framework.apollo.spring.spi.DefaultApolloConfigRegistrarHelper#registerBeanDefinitions

接着是注册了这个 bean,com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes
.fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName()));
String[] namespaces = attributes.getStringArray("value");
int order = attributes.getNumber("order");
PropertySourcesProcessor.addNamespaces(Lists.newArrayList(namespaces), order);

Map<String, Object> propertySourcesPlaceholderPropertyValues = new HashMap<>();
// to make sure the default PropertySourcesPlaceholderConfigurer's priority is higher than PropertyPlaceholderConfigurer
propertySourcesPlaceholderPropertyValues.put("order", 0);

BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class.getName(),
PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues);
// 注册了这个 bean
BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class.getName(),
PropertySourcesProcessor.class);

而com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor 实现了 org.springframework.beans.factory.config.BeanFactoryPostProcessor
它里面的 com.ctrip.framework.apollo.spring.config.PropertySourcesProcessor#postProcessBeanFactory 方法就会被 spring 调用,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void initializePropertySources() {
if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME)) {
//already initialized
return;
}
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);

//sort by order asc
ImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());
Iterator<Integer> iterator = orders.iterator();

while (iterator.hasNext()) {
int order = iterator.next();
for (String namespace : NAMESPACE_NAMES.get(order)) {
// 这里获取每个 namespace 的配置
Config config = ConfigService.getConfig(namespace);

composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
}

然后是 com.ctrip.framework.apollo.ConfigService#getConfig
接着就是它
com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);

if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);

if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);

// 通过 factory 来创建配置获取
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}

创建配置

1
2
3
4
5
6
7
8
9
10
com.ctrip.framework.apollo.spi.DefaultConfigFactory#create
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// 调用到这
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}

然后

1
2
3
4
5
6
7
8
9
10
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
// 正常会走这个,因为要从配置中心获取
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}

然后是创建远程配置仓库

1
2
3
4
com.ctrip.framework.apollo.spi.DefaultConfigFactory#createRemoteConfigRepository
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}

继续对当前的 namespace 创建远程配置仓库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
com.ctrip.framework.apollo.internals.RemoteConfigRepository#RemoteConfigRepository
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
// 尝试同步
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}

然后是同步配置,下面的日志异常经常可以看到,比如配置拉取地址不通

1
2
3
4
5
6
7
8
9
10
11
12
13
com.ctrip.framework.apollo.internals.AbstractConfigRepository#trySync
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}

实际的同步方法,加了synchronized锁,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
com.ctrip.framework.apollo.internals.RemoteConfigRepository#sync
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

try {
// 获取本地配置
ApolloConfig previous = m_configCache.get();
// 获取配置
ApolloConfig current = loadApolloConfig();

//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}

if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}

transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}

然后走到这

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
com.ctrip.framework.apollo.internals.RemoteConfigRepository#loadApolloConfig
private ApolloConfig loadApolloConfig() {
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;

// 获取配置
List<ServiceDTO> configServices = getConfigServices();
String url = null;
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}

for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);

try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}

url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());

logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);

Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {

HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();

transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);

if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}

ApolloConfig result = response.getBody();

logger.debug("Loaded config for {}: {}", m_namespace, result);

return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}

// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}

}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}
1
2
3
4
5
6
7
8
9
com.ctrip.framework.apollo.internals.RemoteConfigRepository#getConfigServices
private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
throw new ApolloConfigException("No available config service");
}

return services;
}
1
2
3
4
5
6
7
8
com.ctrip.framework.apollo.internals.ConfigServiceLocator#getConfigServices
public List<ServiceDTO> getConfigServices() {
if (m_configServices.get().isEmpty()) {
updateConfigServices();
}

return m_configServices.get();
}

更新配置服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
com.ctrip.framework.apollo.internals.ConfigServiceLocator#updateConfigServices
private synchronized void updateConfigServices() {
String url = assembleMetaServiceUrl();

HttpRequest request = new HttpRequest(url);
int maxRetries = 2;
Throwable exception = null;

for (int i = 0; i < maxRetries; i++) {
Transaction transaction = Tracer.newTransaction("Apollo.MetaService", "getConfigService");
transaction.addData("Url", url);
try {
// 发起 http 请求获取配置
HttpResponse<List<ServiceDTO>> response = m_httpUtil.doGet(request, m_responseType);
transaction.setStatus(Transaction.SUCCESS);
List<ServiceDTO> services = response.getBody();
if (services == null || services.isEmpty()) {
logConfigService("Empty response!");
continue;
}
setConfigServices(services);
return;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}

try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(m_configUtil.getOnErrorRetryInterval());
} catch (InterruptedException ex) {
//ignore
}
}

throw new ApolloConfigException(
String.format("Get config services failed from %s", url), exception);
}