[关闭]
@changedi 2016-01-11T05:32:27.000000Z 字数 10346 阅读 4207

一个配置发布订阅中心——读Diamond源码

Java



1. 发布配置中心

Diamond主要提供持久配置的发布和订阅服务,最大特点是结构简单,稳定可靠。主要的使用场景:TDDL使用Diamond动态切换数据库,动态扩容等;业务使用Diamond推送系统开关配置。Diamond产品专注于高可用性,基于此在架构、容灾机制、数据获取模型上有一些与同类产品的不同之处
                       ——阿里巴巴Diamond介绍

Diamond是无单点架构,在做更新配置的时候只做三件事:

本地的设计就是为了缓存,减少对数据库的压力。作为一个配置中心,高可用是最主要的需求。如何保持高可用,Diamond持有多层的数据存储,数据被存储在:数据库,服务端磁盘,客户端缓存目录,以及可以手工干预的容灾目录。 客户端通过API获取配置数据按照固定的顺序去不同的数据源获取数据:容灾目录,服务端磁盘,客户端缓存。

Diamond除了在容灾上做了很多方案,在数据读取方面也有很多特点。客户端采用推拉结合的策略在长连接和短连接之间取得一个平衡,让服务端不用太关注连接的管理,又可以获得长连接的及时性。

使用Diamond的流程:

发布配置:

Created with Raphaël 2.1.2发布配置ClientClientServerServerDBDB发布配置成功!写DB定时程序notify其他server,通知dumpdump配置到本地文件

读取配置:

Created with Raphaël 2.1.2读取配置ClientClientServerServer读容灾目录?读服务器?返回配置写本地缓存读本地缓存

2. 客户端

具体客户端在做什么事情呢?如何发布配置,如何读取配置,这都是客户端要做的事情。我们具体先看看客户端的细节。

2.1 初始化

客户端实现的细节都是static方法,也就是说DiamondClient启动时是在类加载阶段就完成了。在Diamond.java里有这样的代码,这个接口表名要发布一个配置:

  1. static public boolean publishSingle(String dataId, String group, String content) {
  2. return defaultEnv.publishSingle(dataId, group, content);
  3. }

其中defaultEnv是一个DiamondEnv类,在类加载时初始化。

  1. static public final DiamondEnv defaultEnv = new DiamondEnv(new ServerListManager());

ServerListManager是一个启动时和运行时定期获取地址列表的类。启动时拿不到地址列表,进程退出。获取列表主要是通过启动了一个GetServerListTask,这个Task会每隔一段时间去访问diamond服务器,获取配置信息。具体调度是通过ScheduledExecutorService来实现。

  1. class GetServerListTask implements Runnable {
  2. final String url;
  3. GetServerListTask(String url) {
  4. this.url = url;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. updateIfChanged(getApacheServerList(url));
  10. } catch (Exception e) {
  11. log.error("[serverlist] failed to get serverlist, " + e.toString(), e);
  12. }
  13. }
  14. }

其中getApacheServerList负责去http get服务器http://xxx/server/diamond,来获取服务地址。updateIfChanged负责把地址存到serverUrls里并更新到本地文件。除了启动GetServerListTask,DiamondEnv还初始化了一个ClientWorker,这个worker会启动一个调度线程不断的check配置信息,具体见下:

  1. ClientWorker(final DiamondEnv env) {
  2. this.env = env;
  3. executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
  4. @Override
  5. public Thread newThread(Runnable r) {
  6. Thread t = new Thread(r);
  7. t.setName("com.taobao.diamond.client.Worker."+ env.serverMgr.name);
  8. t.setDaemon(true);
  9. return t;
  10. }
  11. });
  12. executor.scheduleWithFixedDelay(new Runnable() {
  13. public void run() {
  14. try {
  15. checkLocalConfigInfo();
  16. checkServerConfigInfo();
  17. } catch (Throwable e) {
  18. log.error("[sub-error-rotate] rotate check error", e);
  19. }
  20. }
  21. }, 1L, 1L, TimeUnit.MILLISECONDS);
  22. }

check的过程分两步,check本地和check服务端。本地check检查本地容灾目录是否有对应的配置内容文件,如果有,把本地容灾读取开关打开(这个开关会控制diamond客户端优先读哪个地方);如果没有,就把开关关掉;如果有但是和内存中持有的文件lastModified时间戳不一致,这时候把内存变量的内容更新。check服务端,会访问服务端,获取后,有一段更新本地缓存的逻辑,这个后面会讲到。check本身是有前提的,如果有配置过CacheData,那么check才会进行,否则直接结束。而CacheData会在后面讲到。至此,diamond客户端的初始化工作完成。

2.2 发布配置

如上面提到,发布配置就是publishSingle方法,内容很简单,就是发一个http post请求到diamond server。代码如下:

  1. String url = "/diamond_publish.do?method=syncUpdateAll";
  2. List<String> params = Arrays.asList("dataId", dataId, "group", group, "content", content);
  3. HttpResult result = null;
  4. try {
  5. result = agent.httpPost(url, null, params, Constants.ENCODE, POST_TIMEOUT);
  6. } catch (IOException ioe) {
  7. log.warn("[publish-single] error, " + dataId + ", " + group + ", msg: "
  8. + ioe.toString());
  9. return false;
  10. }

这里有一些优化,agent是一个ServerHttpAgent,post方法是定制的,会从ServerListManager将server的地址排序,依次post,做到同机房优先。

2.3 获取配置

获取配置通过Diamond.getConfig接口来完成,这个接口支持默认获取顺序,也支持自定义其他顺序,读取顺序是这样的,优先读本地容灾->读server->读本地缓存,代码如下:

  1. // 优先使用本地配置
  2. String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);
  3. if (content != null) {
  4. log.warn("[get-config] get failover ok, dataId=" + dataId + ", group=" + group
  5. + ", config=" + ContentUtils.truncateContent(content));
  6. return content;
  7. }
  8. try {
  9. return ClientWorker.getServerConfig(this, dataId, group, timeoutMs);
  10. } catch (IOException ioe) {
  11. log.warn("[get-config] get server error, dataId:" + dataId + ", group:" + group
  12. + ", " + ioe.toString());
  13. }
  14. log.warn("[get-config] get snapshot, dataId:" + dataId + ", group:" + group);
  15. return LocalConfigInfoProcessor.getSnapshot(this, dataId, group);

在ClientWorker获取服务端配置时,有一个注意点,在发送/diamond_config调用的时候,有这样一段逻辑:

  1. switch (result.code) {
  2. case HttpURLConnection.HTTP_OK:
  3. // if (env == defaultEnv) {
  4. LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, result.content);
  5. // }
  6. return result.content;
  7. case HttpURLConnection.HTTP_NOT_FOUND:
  8. // if (env == defaultEnv) {
  9. LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, null);
  10. // }
  11. return null;
  12. case HttpURLConnection.HTTP_CONFLICT: {
  13. log.warn("[sub-server-error] data being modified");
  14. throw new IOException("data being modified");
  15. }

这意味着,在服务端获取后,会更新本地缓存,同理的是如果获取不到也会清除本地缓存。这个就和初始化阶段的定时调度check任务联系起来。

2.4 监听回调

Diamond还支持的一个功能是监听变更,可以注册一个配置变更监听器,当服务端配置变更后,可以实现回调。具体接口:

  1. static public void addListener(String dataId, String group, ManagerListener listener)
  2. static public void removeListener(String dataId, String group, ManagerListener listener)

添加时会用到CacheData:

  1. /**
  2. * 查询CacheData,不存在时新增。
  3. */
  4. public CacheData addCacheDataIfAbsent(String dataId, String group) {
  5. CacheData cache = getCache(dataId, group);
  6. if (null != cache) {
  7. return cache;
  8. }
  9. synchronized (cacheMap) {
  10. String key = GroupKey.getKey(dataId, group);
  11. cache = new CacheData(dataId, group);
  12. Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
  13. copy.put(key, cache);
  14. cacheMap.set(copy);
  15. log.info("[subscribe] " + key);
  16. }
  17. String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);
  18. content = (null != content) ? content //
  19. : LocalConfigInfoProcessor.getSnapshot(this, dataId, group);
  20. cache.setContent(content);
  21. return cache;
  22. }

CacheData很重要,因为listener就是加在CacheData上的。listener是实现ManagerListener接口的,接口信息:

  1. public interface ManagerListener {
  2. public Executor getExecutor();
  3. /**
  4. * 接收配置信息
  5. *
  6. * @param configInfo
  7. */
  8. public void receiveConfigInfo(final String configInfo);
  9. }

listener的执行是在初始化步骤提到的check阶段做的,在check本地和服务端都会调用checkListenerMd5(env)方法,而这个方法就是检查变更后通知listener执行receiveConfigInfo方法。

  1. void checkListenerMd5() {
  2. for (ManagerListenerWrap wrap : listeners) {
  3. if (!md5.equals(wrap.lastCallMd5)) {
  4. safeNotifyListener(dataId, group, content, md5, wrap);
  5. }
  6. }
  7. }
  8. static void safeNotifyListener(final String dataId, final String group, final String content,
  9. final String md5, ManagerListenerWrap listenerWrap) {
  10. final ManagerListener listener = listenerWrap.listener;
  11. listenerWrap.lastCallMd5 = md5;
  12. Runnable job = new Runnable() {
  13. public void run() {
  14. try {
  15. listener.receiveConfigInfo(content);
  16. log.info("[notify-ok] " + dataId + ", " + group + ", md5=" + md5
  17. + ", listener=" + listener);
  18. } catch (Throwable t) {
  19. log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5
  20. + ", listener=" + listener.toString(), t);
  21. }
  22. }
  23. };
  24. try {
  25. if (null != listener.getExecutor()) {
  26. listener.getExecutor().execute(job);
  27. } else {
  28. job.run();
  29. }
  30. } catch (Throwable t) {
  31. log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5 + ", listener="
  32. + listener.toString(), t);
  33. }
  34. }

可以看到检查变更就是检查md5签名是否一致,不一致就说明配置有变更,这时就通知listener执行。

最后汇总一下,client端涉及到的几个连接server的地方。客户端要做的事情就讲完了。值得一提的是每次获取服务的http调用,在header里都是要set一个appkey和appsecret作为token校验的。当然这应该是http服务的标配。

连接 用途
xxx/server/diamond 获取服务地址
ip/server/diamond_publish?method=syncUpdateAll 发布配置
ip/server/diamond_config 拉取配置

ip就是通过获取服务地址获取到的服务器地址,因为diamond本身也是无单点的架构,部署在不同机房,获取服务地址就是为了方便发布和拉取时能够优选机房。

3. 服务端

Diamond的server端是一个java web项目,部署了一个java开发的web系统。对于diamond请求,我没有找到对应的实现代码。我猜想这个服务部署在另一个系统里,提供diamond server的服务地址查询服务,我本地访问了一下,会返回一个server的ip列表。而diamond_pulish的请求,被一个servlet处理,参数是dataId、group和配置的内容content。处理servlet做的事情——把数据插入数据库。

如果是获取请求,首先有个加读锁的动作,diamond servlet会给这个读请求申请加读锁,读写锁的实现是SimpleReadWriteLock类,里面通过定义一个int型的status来判断读写锁。拿到锁后,从本地文件中把配置读出来,返回响应请求。

等下,这里好像出现问题了,发布的时候把配置写到数据库,怎么读的时候反而读文件了呢?原来在服务器启动时,也有很多service带着守护线程启动了,它们随时监控着系统的情况。比如PersistService,启动时定时的去检查数据库的读取情况,不间断的运行一个SelectMasterTask,去选择master数据库。选择的过程就是从服务器配置的若干个数据源里选择主库(方法是执行一条delete,能做就是master,因为备库是readonly)。

  1. class SelectMasterTask implements Runnable {
  2. public void run() {
  3. defaultLog.info("check master db.");
  4. boolean isFound = false;
  5. for (BasicDataSource ds : dataSourceList) {
  6. testMasterJT.setDataSource(ds);
  7. testMasterJT.setQueryTimeout(QUERY_TIMEOUT);
  8. try {
  9. testMasterJT
  10. .update("delete from config_info where data_id='com.taobao.diamond.testMasterDB'");
  11. if (jt.getDataSource() != ds) {
  12. fatalLog.warn("[master-db] {}", ds.getUrl());
  13. }
  14. jt.setDataSource(ds);
  15. isFound = true;
  16. break;
  17. } catch (DataAccessException e) { // read only
  18. e.printStackTrace(); // TODO remove
  19. }
  20. }
  21. if (!isFound) {
  22. fatalLog.error("[master-db] master db not found.");
  23. }
  24. }
  25. }

比如DumpService,启动时初始化了一个TaskManager,运行ProcessRunnable任务,这个任务就是让TaskManager.process()。

  1. protected void process() {
  2. for (Map.Entry<String, Task> entry : this.tasks.entrySet()) {
  3. Task task = null;
  4. this.lock.lock();
  5. try {
  6. // 获取任务
  7. task = entry.getValue();
  8. if (null != task) {
  9. if (!task.shouldProcess()) {
  10. // 任务当前不需要被执行,直接跳过
  11. continue;
  12. }
  13. // 先将任务从任务Map中删除
  14. this.tasks.remove(entry.getKey());
  15. }
  16. }
  17. finally {
  18. this.lock.unlock();
  19. }
  20. if (null != task) {
  21. // 获取任务处理器
  22. TaskProcessor processor = this.taskProcessors.get(entry.getKey());
  23. if (null == processor) {
  24. // 如果没有根据任务类型设置的处理器,使用默认处理器
  25. processor = this.getDefaultTaskProcessor();
  26. }
  27. if (null != processor) {
  28. boolean result = false;
  29. try {
  30. // 处理任务
  31. result = processor.process(entry.getKey(), task);
  32. }
  33. catch (Throwable t) {
  34. log.error("处理task失败", t);
  35. }
  36. if (!result) {
  37. // 任务处理失败,设置最后处理时间
  38. task.setLastProcessTime(System.currentTimeMillis());
  39. // 将任务重新加入到任务Map中
  40. this.addTask(entry.getKey(), task);
  41. }
  42. }
  43. }
  44. }
  45. if (tasks.isEmpty()) {
  46. this.lock.lock();
  47. try {
  48. this.notEmpty.signalAll();
  49. }
  50. finally {
  51. this.lock.unlock();
  52. }
  53. }
  54. }

就是调度任务,去执行任务表里的所有任务。看看DumpService的构造函数:

  1. @Autowired
  2. public DumpService(PersistService persistService) {
  3. DiskUtil.clearAll();
  4. this.persistService = persistService;
  5. DumpProcessor processor = new DumpProcessor(this);
  6. DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
  7. dumpTaskMgr = new TaskManager("com.taobao.diamond.server.DumpTaskManager");
  8. dumpTaskMgr.setDefaultTaskProcessor(processor);
  9. dumpTaskMgr.addProcessor(DumpAllTask.taskId, dumpAllProcessor);
  10. Runnable dumpAll = new Runnable() {
  11. @Override
  12. public void run() {
  13. dumpTaskMgr.addTask(DumpAllTask.taskId, new DumpAllTask());
  14. }
  15. };
  16. TimerTaskService.scheduleWithFixedDelay(dumpAll, dumpAllIntervalInHour,
  17. dumpAllIntervalInHour, TimeUnit.HOURS);
  18. // initial dump all
  19. dumpAllProcessor.process(DumpAllTask.taskId, new DumpAllTask());
  20. }

Task表里的任务都是dumpAll加进去的,就是一个任务——dumpAll——把数据库里所有的配置记录都dump到磁盘。

还有形如MergeDatumService负责把所有同类可聚合的配置聚合起来。值得一提的是NotifyService,看下它的NotifyTaskProcessor的process方法:

  1. @Override
  2. public boolean process(String taskType, Task task) {
  3. NotifyTask notifyTask = (NotifyTask) task;
  4. String dataId = notifyTask.getDataId();
  5. String group = notifyTask.getGroup();
  6. long lastModified = notifyTask.getLastModified();
  7. boolean isok = true;
  8. for (String ip : serverListService.getServerList()) {
  9. isok = notifyToDump(dataId, group,lastModified, ip) && isok;
  10. }
  11. return isok;
  12. }

里面的notifyToDump方法就是像其他的diamond server发形如diamond_notify.do?method=xxx&dataId=xxx&group=xxx的请求。而server对于这类请求的servlet处理是启动DumpService的dump方法,该方法就是将DumpTask加到dumpTaskMgr里,刚才上面的代码已经看到了,dumpTaskMgr添加了两个processor,一个负责dumpAll,一个负责dump。dump做的事情就是从数据库里查询dataId对应的content,然后写到本地。

这里一个设计我觉得可以抽出来,就是守护线程和task的设计:

4. 总结

Diamond这样的场景应该是所有业务开发过程中都会遇到的需求场景。和Zookeeper有很大程度的功能重合。但是实现上可以看到Diamond有很多限制,读写一致性无法保证,因为写server写的是DB,而读server却读的是文件,而文件的dump依赖notify task和dump task。这都是有守护线程完成,有一定的时延。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注