配置中心 配置发布与客户端配置更新 实现方式:
Admin Service 在配置发布后会往 ReleaseMessage 表插入一条消息记录,消息内容就是配置发布的 AppId+Cluster+Namespace ,参见 DatabaseMessageSender 。
Config Service 有一个线程会每秒扫描一次 ReleaseMessage 表,看看是否有新的消息记录,参见 ReleaseMessageScanner 。
Config Service 如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如 NotificationControllerV2 ,消息监听器的注册过程参见 ConfigServiceAutoConfiguration 。
NotificationControllerV2 得到配置发布的 AppId+Cluster+Namespace 后,会通知对应的客户端。
ReleaseMessage消息内容通过generate方法将 appId + cluster + namespace 拼接,使用 ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR = "+" 作为间隔 ,例如:"test+default+application" 。
因此,对于同一个 Namespace ,生成的消息内容 是相同 的。通过这样的方式,我们可以使用最新的 ReleaseMessage 的 id 属性,作为 Namespace 是否发生变更的标识。而 Apollo 确实是通过这样的方式实现,Client 通过不断使用获得到 ReleaseMessage 的 id 属性 作为版本号 ,请求 Config Service判断是否配置 发生了变化。
正因为,ReleaseMessage 设计的意图是作为配置发生变化的通知,所以对于同一个 Namespace ,仅需要保留其最新的 ReleaseMessage 记录即可,会有后台任务不断清理旧的 ReleaseMessage 记录。
源码解析:
1、发布配置调用Portal的createRelease方法,具体逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 1 : @Autowired 2 : private ReleaseService releaseService; 3 : @Autowired 4 : private ApplicationEventPublisher publisher; 5 : @Autowired 6 : private PortalConfig portalConfig; 7 : 8 : @PreAuthorize(value = "@permissionValidator.hasReleaseNamespacePermission(#appId, #namespaceName)") 9 : @RequestMapping(value = "/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/releases", method = RequestMethod.POST) 10 : public ReleaseDTO createRelease (@PathVariable String appId, 11 : @PathVariable String env, @PathVariable String clusterName,12 : @PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {13 : 14 : checkModel(Objects.nonNull(model));15 : 16 : model.setAppId(appId);17 : model.setEnv(env);18 : model.setClusterName(clusterName);19 : model.setNamespaceName(namespaceName);20 : 21 : if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {22 : throw new BadRequestException(String.format("Env: %s is not supported emergency publish now" , env));23 : }24 : 25 : ReleaseDTO createdRelease = releaseService.publish(model);26 : 27 : 28 : ConfigPublishEvent event = ConfigPublishEvent.instance();29 : event.withAppId(appId)30 : .withCluster(clusterName)31 : .withNamespace(namespaceName)32 : .withReleaseId(createdRelease.getId())33 : .setNormalPublishEvent(true )34 : .setEnv(Env.valueOf(env));35 : 36 : publisher.publishEvent(event);37 : 38 : return createdRelease;39 : }
第 14 行:校验 NamespaceReleaseModel 非空。
第 15 至 19 行:设置 PathVariable 变量到 NamespaceReleaseModel 中。
第 20 至 23 行:校验 若是紧急发布,但是当前环境未允许该操作,抛出 BadRequestException 异常。
紧急发布 功能,可通过设置 PortalDB 的 ServerConfig 的"emergencyPublish.supported.envs" 配置开启对应的 Env 们 。例如,emergencyPublish.supported.envs = dev 。
第 25 行:调用 ReleaseService#publish(NamespaceReleaseModel) 方法,调用 Admin Service API ,发布配置。
第 27 至 36 行:创建 ConfigPublishEvent 对象,并调用 ApplicationEventPublisher#publishEvent(event) 方法,发布 ConfigPublishEvent 事件。这部分,我们在后续文章分享。
第 38 行:返回 ReleaseDTO 对象。
2、ReleaseService的publish方法,调用Admin Serivice API发布配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 1 : @Autowired 2 : private UserInfoHolder userInfoHolder; 3 : @Autowired 4 : private AdminServiceAPI.ReleaseAPI releaseAPI; 5 : 6 : public ReleaseDTO publish (NamespaceReleaseModel model) { 7 : Env env = model.getEnv(); 8 : boolean isEmergencyPublish = model.isEmergencyPublish(); 9 : String appId = model.getAppId(); 10 : String clusterName = model.getClusterName();11 : String namespaceName = model.getNamespaceName();12 : String releaseBy = StringUtils.isEmpty(model.getReleasedBy()) ? userInfoHolder.getUser().getUserId() : model.getReleasedBy();13 : 14 : 15 : ReleaseDTO releaseDTO = releaseAPI.createRelease(appId, env, clusterName, namespaceName,16 : model.getReleaseTitle(), model.getReleaseComment(),17 : releaseBy, isEmergencyPublish);18 : 19 : Tracer.logEvent(TracerEventType.RELEASE_NAMESPACE, String.format("%s+%s+%s+%s" , appId, env, clusterName, namespaceName));20 : return releaseDTO;21 : }
3、ReleaseAPI中封装了对Admin Service的Release模块的API调用:
4、Admin侧ReleaseController接收请求,发布Namespace 的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 1 : @Autowired 2 : private ReleaseService releaseService; 3 : @Autowired 4 : private NamespaceService namespaceService; 5 : @Autowired 6 : private MessageSender messageSender; 7 : 8 : @Transactional 9 : @RequestMapping(path = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases", method = RequestMethod.POST) 10 : public ReleaseDTO publish (@PathVariable("appId") String appId, 11 : @PathVariable("clusterName") String clusterName,12 : @PathVariable("namespaceName") String namespaceName,13 : @RequestParam("name") String releaseName,14 : @RequestParam(name = "comment", required = false) String releaseComment,15 : @RequestParam("operator") String operator,16 : @RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {17 : 18 : Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);19 : if (namespace == null ) {20 : throw new NotFoundException(String.format("Could not find namespace for %s %s %s" , appId, clusterName, namespaceName));21 : }22 : 23 : Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);24 : 25 : 26 : 27 : Namespace parentNamespace = namespaceService.findParentNamespace(namespace);28 : String messageCluster;29 : if (parentNamespace != null ) { 30 : messageCluster = parentNamespace.getClusterName();31 : } else {32 : messageCluster = clusterName; 33 : }34 : 35 : messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName), Topics.APOLLO_RELEASE_TOPIC);36 : 37 : 38 : return BeanUtils.transfrom(ReleaseDTO.class, release);39 : }
POST /apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases 接口 ,Request Body 传递 JSON 对象。
第 17 至 21 行:校验 对应的 Namespace 对象是否存在。若不存在,抛出 NotFoundException 异常。
第 23 行:调用 ReleaseService#publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish) 方法,发布 Namespace 的配置,返回 Release 对象。
第 26 至 33 行:获得发布消息的Cluster名字。
第 35 行:调用 MessageSender#sendMessage(String message, String channel) 方法,发送发布消息。详细实现,下一篇文章详细解析。
第 38 行:调用 BeanUtils#transfrom(Class<T> clazz, Object src) 方法,将 Release 转换 成 ReleaseDTO 对象。
5、调用ReleaseService的publish方法,发布Namespace配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 1 : private Gson gson = new Gson(); 2 : 3 : @Autowired 4 : private ReleaseRepository releaseRepository; 5 : @Autowired 6 : private ItemService itemService; 7 : @Autowired 8 : private AuditService auditService; 9 : @Autowired 10 : private NamespaceLockService namespaceLockService;11 : @Autowired 12 : private NamespaceService namespaceService;13 : @Autowired 14 : private ReleaseHistoryService releaseHistoryService;15 : 16 : @Transactional 17 : public Release publish (Namespace namespace, String releaseName, String releaseComment, String operator, boolean isEmergencyPublish) {18 : 19 : checkLock(namespace, isEmergencyPublish, operator);20 : 21 : Map<String, String> operateNamespaceItems = getNamespaceItems(namespace);22 : 23 : Namespace parentNamespace = namespaceService.findParentNamespace(namespace);24 : 25 : 26 : if (parentNamespace != null ) {27 : return publishBranchNamespace(parentNamespace, namespace, operateNamespaceItems, releaseName, releaseComment, operator, isEmergencyPublish);28 : }29 : 30 : Namespace childNamespace = namespaceService.findChildNamespace(namespace);31 : 32 : Release previousRelease = null ;33 : if (childNamespace != null ) {34 : previousRelease = findLatestActiveRelease(namespace);35 : }36 : 37 : 38 : Map<String, Object> operationContext = Maps.newHashMap();39 : operationContext.put(ReleaseOperationContext.IS_EMERGENCY_PUBLISH, isEmergencyPublish);40 : 41 : Release release = masterRelease(namespace, releaseName, releaseComment, operateNamespaceItems, operator, ReleaseOperation.NORMAL_RELEASE, operationContext); 42 : 43 : 44 : if (childNamespace != null ) {45 : mergeFromMasterAndPublishBranch(namespace, childNamespace, operateNamespaceItems,46 : releaseName, releaseComment, operator, previousRelease,47 : release, isEmergencyPublish);48 : }49 : return release;50 : }
第 19 行:调用 #checkLock(namespace, isEmergencyPublish, operator) 方法,校验 NamespaceLock 锁定。
第 21 行:调用 #getNamespaceItems(namespace) 方法,获得 Namespace 的普通 配置 Map 。
第 23 行:调用 #findParentNamespace(namespace) 方法,获得父 Namespace 对象。
第 26 至 28 行:若有父 Namespace 对象,灰度发布 。
第 30 行:调用 NamespaceService#findChildNamespace(namespace) 方法,获得子 Namespace 对象。
第 31 至 35 行:调用 #findLatestActiveRelease(Namespace) 方法,获得上一次 ,并且有效的 Release 对象。
第 36 至 39 行:创建操作 Context 。
第 41 行:调用 #masterRelease(namespace, releaseName, releaseComment, operateNamespaceItems, operator, releaseOperation, operationContext) 方法,主干 发布配置。🙂 创建的 Namespace ,默认就是主干 ,而灰度 发布使用的是分支 。
第 42 至 48 行:调用 #mergeFromMasterAndPublishBranch(...) 方法,若有子 Namespace 时,自动将主干合并到子 Namespace ,并进行一次子 Namespace 的发布。
第 49 行:返回 Release 对象。
6、调用masterRelease方法,主干发布配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1 : private Release masterRelease (Namespace namespace, String releaseName, String releaseComment, 2 : Map<String, String> configurations, String operator, 3 : int releaseOperation, Map<String, Object> operationContext) { 4 : 5 : Release lastActiveRelease = findLatestActiveRelease(namespace); 6 : long previousReleaseId = lastActiveRelease == null ? 0 : lastActiveRelease.getId(); 7 : 8 : Release release = createRelease(namespace, releaseName, releaseComment, configurations, operator); 9 : 10 : 11 : releaseHistoryService.createReleaseHistory(namespace.getAppId(), namespace.getClusterName(),12 : namespace.getNamespaceName(), namespace.getClusterName(),13 : release.getId(), previousReleaseId, releaseOperation,14 : operationContext, operator);15 : return release;16 : }
第 5 行:调用 #findLatestActiveRelease(namespace) 方法,获得最后 、有效 的 Release 对象。
第 8 行:调用 #createRelease(namespace, releaseName, releaseComment, configurations, operator) 方法,创建 Release 对象,并保存。
第10 至 14 行:调用 ReleaseHistoryService#createReleaseHistory(appId, clusterName, namespaceName, branchName, releaseId, previousReleaseId, operation, operationContext, operator) 方法,创建 ReleaseHistory 对象,并保存。
7、调用createRelease方法,创建Release对象并保存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 1 : private Release createRelease (Namespace namespace, String name, String comment, 2 : Map<String, String> configurations, String operator) { 3 : 4 : Release release = new Release(); 5 : release.setReleaseKey(ReleaseKeyGenerator.generateReleaseKey(namespace)); 6 : release.setDataChangeCreatedTime(new Date()); 7 : release.setDataChangeCreatedBy(operator); 8 : release.setDataChangeLastModifiedBy(operator); 9 : release.setName(name); 10 : release.setComment(comment);11 : release.setAppId(namespace.getAppId());12 : release.setClusterName(namespace.getClusterName());13 : release.setNamespaceName(namespace.getNamespaceName());14 : release.setConfigurations(gson.toJson(configurations)); 15 : 16 : release = releaseRepository.save(release);17 : 18 : namespaceLockService.unlock(namespace.getId());19 : 20 : auditService.audit(Release.class.getSimpleName(), release.getId(), Audit.OP.INSERT, release.getDataChangeCreatedBy());21 : return release;22 : }
8、调用ReleaseHistoryService的createReleaseHistory方法创建ReleaseHistory对象并保存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 1 : private Gson gson = new Gson(); 2 : 3 : @Autowired 4 : private ReleaseHistoryRepository releaseHistoryRepository; 5 : @Autowired 6 : private AuditService auditService; 7 : 8 : @Transactional 9 : public ReleaseHistory createReleaseHistory (String appId, String clusterName, String namespaceName, String branchName, 10 : long releaseId, long previousReleaseId, int operation,11 : Map<String, Object> operationContext, String operator) {12 : 13 : ReleaseHistory releaseHistory = new ReleaseHistory();14 : releaseHistory.setAppId(appId);15 : releaseHistory.setClusterName(clusterName);16 : releaseHistory.setNamespaceName(namespaceName);17 : releaseHistory.setBranchName(branchName);18 : releaseHistory.setReleaseId(releaseId); 19 : releaseHistory.setPreviousReleaseId(previousReleaseId); 20 : releaseHistory.setOperation(operation);21 : if (operationContext == null ) {22 : releaseHistory.setOperationContext("{}" ); 23 : } else {24 : releaseHistory.setOperationContext(gson.toJson(operationContext));25 : }26 : releaseHistory.setDataChangeCreatedTime(new Date());27 : releaseHistory.setDataChangeCreatedBy(operator);28 : releaseHistory.setDataChangeLastModifiedBy(operator);29 : 30 : releaseHistoryRepository.save(releaseHistory);31 : 32 : auditService.audit(ReleaseHistory.class.getSimpleName(), releaseHistory.getId(), Audit.OP.INSERT, releaseHistory.getDataChangeCreatedBy());33 : return releaseHistory;34 : }
第 12 至 28 行:创建 ReleaseHistory 对象,并设置对应的属性。
第 30 行:调用 ReleaseHistoryRepository#save(ReleaseHistory) 方法,保存 ReleaseHistory 对象。
第 32 行:记录 Audit 到数据库中。
9、Admin Service 发送 ReleaseMessage:
Admin Service 在配置发布后,需要通知所有的 Config Service 有配置发布,从而 Config Service 可以通知对应的客户端来拉取最新的配置。
从概念上来看,这是一个典型的消息使用场景 ,Admin Service 作为 producer 发出消息,各个Config Service 作为 consumer 消费消息。通过一个消息组件 (Message Queue)就能很好的实现 Admin Service 和 Config Service 的解耦。
在实现上,考虑到 Apollo 的实际使用场景,以及为了尽可能减少外部依赖 ,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列 。
实现方式:
Admin Service 在配置发布后会往 ReleaseMessage 表插入一条消息记录,消息内容就是配置发布的 AppId+Cluster+Namespace ,参见 DatabaseMessageSender 。
Config Service 有一个线程会每秒扫描一次 ReleaseMessage 表,看看是否有新的消息记录,参见 ReleaseMessageScanner 。
Config Service 如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如 NotificationControllerV2 ,消息监听器的注册过程参见 ConfigServiceAutoConfiguration 。
NotificationControllerV2 得到配置发布的 AppId+Cluster+Namespace 后,会通知对应的客户端。
在 ReleaseController 的 #publish(...) 方法中,会调用 MessageSender#sendMessage(message, channel) 方法,发送 Message 。
1 2 3 4 5 6 7 8 9 10 11 Namespace parentNamespace = namespaceService.findParentNamespace(namespace); String messageCluster; if (parentNamespace != null ) { messageCluster = parentNamespace.getClusterName(); } else { messageCluster = clusterName; } messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName), Topics.APOLLO_RELEASE_TOPIC);
DatabaseMessageSender 实现 MessageSender 接口,Message 发送者实现类 ,基于数据库 实现。
sendMessage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 1 : @Override 2 : @Transactional 3 : public void sendMessage (String message, String channel) { 4 : logger.info("Sending message {} to channel {}" , message, channel); 5 : 6 : if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) { 7 : logger.warn("Channel {} not supported by DatabaseMessageSender!" ); 8 : return ; 9 : } 10 : 11 : Tracer.logEvent("Apollo.AdminService.ReleaseMessage" , message);12 : 13 : Transaction transaction = Tracer.newTransaction("Apollo.AdminService" , "sendMessage" );14 : try {15 : 16 : ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));17 : 18 : toClean.offer(newMessage.getId());19 : 20 : transaction.setStatus(Transaction.SUCCESS);21 : } catch (Throwable ex) {22 : 23 : logger.error("Sending message to database failed" , ex);24 : transaction.setStatus(ex);25 : throw ex;26 : } finally {27 : 28 : transaction.complete();29 : }30 : }
第 5 至 9 行:第 5 至 9 行:仅允许 发送 APOLLO_RELEASE_TOPIC 。
第 16 行:调用 ReleaseMessageRepository#save(ReleaseMessage) 方法,保存 ReleaseMessage 对象。
第 18 行:调用toClean#offer(Long id)方法,添加到清理 Message 队列。若队列已满,添加失败,不阻塞等待。
清理 ReleaseMessage 任务 #initialize() 方法,通知 Spring 调用,初始化清理 ReleaseMessage 任务 。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 1 : @PostConstruct 2 : private void initialize () { 3 : cleanExecutorService.submit(() -> { 4 : 5 : while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) { 6 : try { 7 : 8 : Long rm = toClean.poll(1 , TimeUnit.SECONDS); 9 : 10 : if (rm != null ) {11 : cleanMessage(rm);12 : 13 : } else {14 : TimeUnit.SECONDS.sleep(5 );15 : }16 : } catch (Throwable ex) {17 : 18 : Tracer.logError(ex);19 : }20 : }21 : });22 : }
第 3 至 21 行:调用ExecutorService#submit(Runnable)方法,提交清理 ReleaseMessage 任务
第 5 行:循环 ,直到停止。
第 8 行:调用BlockingQueue#poll(long timeout, TimeUnit unit)方法,拉取队头的消息编号。
第 10 至 11 行:若拉取到消息编号,调用 #cleanMessage(Long id) 方法,处理拉取到的消息,即清理老消息们 。
第 13 至 15 行:若未 拉取到消息编号,说明队列为空 ,sleep ,避免空跑,占用 CPU 。
cleanMessage(Long id) 方法,清理老消息们。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1 : private void cleanMessage (Long id) { 2 : boolean hasMore = true ; 3 : 4 : 5 : ReleaseMessage releaseMessage = releaseMessageRepository.findOne(id); 6 : if (releaseMessage == null ) { 7 : return ; 8 : } 9 : 10 : while (hasMore && !Thread.currentThread().isInterrupted()) {11 : 12 : 13 : 14 : List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(15 : releaseMessage.getMessage(), releaseMessage.getId());16 : 17 : releaseMessageRepository.delete(messages);18 : 19 : hasMore = messages.size() == 100 ;20 : 21 : messages.forEach(toRemove -> Tracer.logEvent(22 : String.format("ReleaseMessage.Clean.%s" , toRemove.getMessage()), String.valueOf(toRemove.getId())));23 : }24 : }
第 5 至 8 行:调用ReleaseMessageRepository#findOne(id)方法,查询对应的 ReleaseMessage 对象,避免已经删除。因为,DatabaseMessageSender 会在多进程中执行 。例如:1)Config Service + Admin Service ;2)N * Config Service ;3)N * Admin Service 。为什么 Config Service 和 Admin Service 都会启动清理任务呢?😈 因为 DatabaseMessageSender 添加了 @Component 注解,而 NamespaceService 注入了 DatabaseMessageSender 。而 NamespaceService 被 apollo-adminservice 和 apoll-configservice 项目都引用了,所以都会启动该任务。
第 10 至 23 行:循环删除相同消息内容,ReleaseMessage.message的老消息,即 Namespace 的老消息。
第 14 至 15 行:调用ReleaseMessageRepository#findFirst100ByMessageAndIdLessThanOrderByIdAsc(message, id)方法,拉取相同消息内容的100条的老消息,按照 id升序。
老消息的定义 :比当前消息编号小,即先发送的。
第 17 行:调用 ReleaseMessageRepository#delete(messages) 方法,删除 老消息。
第 19 行:若拉取不足 100 条,说明无老消息了。
第 21 至 22 行:【TODO 6001】Tracer 日志
ReleaseMessageListener:ReleaseMessage 监听器 接口
ReleaseMessageScanner接口:ReleaseMessage 扫描器,被 Config Service 使用 。
afterPropertiesSet方法:初始化Scan任务
fireMessageScanned方法:触发监听器,处理ReleaseMessage们
Config Service通知配置改变 NotificationControllerV2类:实现 ReleaseMessageListener 接口,通知 Controller ,仅 提供 notifications/v2 接口。
pollNotification方法:用于实现长轮训功能,用于监听客户端关注的配置变化。在处理监听结果时,如果有新的配置变化,则直接返回新的配置变化通知,如果没有则注册监听,等待下一次配置变化,在超时或完成监听时取消监听。
filterNotifications方法:根据传入的通知列表,过滤出有效的Apollo配置通知,并将其存储在一个HashMap中。
getApolloConfigNotifications方法:获得新的ApolloConfigNotification 通知数组,通过遍历命名空间和监听键,如果每个命名空间的最新通知ID,并与客户端的通知ID进行比较,如果大于则创建一个新的Apollo配置通知,将命名空间和最新通知ID以及相关监听键和通知ID添加到通知中,最后返回所有新的Apollo配置通知。
handleMessage方法:处理Apollo配置中心的发布消息,并通知相关客户端。在处理过程中,首先记录接收到的消息,然后解析消息内容,校验消息内容,获取待通知的客户端列表,最后根据客户端数量(默认100个)选择同步或异步通知客户端。
ApolloConfigNotification类:Apollo 配置通知 DTO 。 ApolloNotificationMessages类:Apollo 配置通知消息集合 DTO 。DeferredResultWrapper类:DeferredResult 包装器,封装 DeferredResult 的公用 方法。
onTimeout:设置超时回调函数
onCompletion:设置完成回调函数
setResult:设置结果
AppNamespaceServiceWithCache类:实现应用命名空间的缓存服务。通过定时扫描新的应用命名空间并更新缓存。缓存实现方式如下:
启动时,全量初始化 AppNamespace 到缓存
考虑 AppNamespace 新增,后台定时任务,定时增量初始化 AppNamespace 到缓存
考虑 AppNamespace 更新与删除,后台定时任务,定时全量重建 AppNamespace 到缓存
ReleaseMessageServiceWithCache类:实现基于缓存的发布消息服务。
启动时,初始化 ReleaseMessage 到缓存。
新增时,基于 ReleaseMessageListener ,通知有新的 ReleaseMessage ,根据是否有消息间隙,直接使用该 ReleaseMessage 或从数据库读取。
WatchKeysUtil类:生成监听配置变化的key集合Config Service配置读取接口 ConfigController提供配置读取功能。
queryConfig方法:
从传入的参数提取所需的信息,并对namespace参数进行处理,去掉.properties后缀,并将其转换为正确的命名空间格式
检查clientIp和clentVersion参数是否为空,如果为空则从HttpServletRequest对象中获取相应的信息
将messageAsString参数转换为ApolloNotificationMessages对象
创建一个空的Release对象,用于存储查询到的发布Realse
根据传入的参数查询配置,首先查询与当前应用程序相关的配置,然后查询公共配置,如果找到相关的配置则添加进Release列表中
如果列表为空,则返回一个HTTP 404错误,表示未找到配置,同时记录一个审计事件
方法审计查询到的发布,包括记录客户端的IP地址、版本和集群名称等信息
方法合并查询到的所有发布的配置,并将它们与应用程序ID、集群名称、原始命名空间和数据中心一起组合成一个ApolloConfig对象
记录一个审计事件表示找到了配置并将ApolloConfig对象返回给调用方