@changedi
2016-01-11T05:32:27.000000Z
字数 10346
阅读 6743
Java
Diamond主要提供持久配置的发布和订阅服务,最大特点是结构简单,稳定可靠。主要的使用场景:TDDL使用Diamond动态切换数据库,动态扩容等;业务使用Diamond推送系统开关配置。Diamond产品专注于高可用性,基于此在架构、容灾机制、数据获取模型上有一些与同类产品的不同之处
——阿里巴巴Diamond介绍
Diamond是无单点架构,在做更新配置的时候只做三件事:
本地的设计就是为了缓存,减少对数据库的压力。作为一个配置中心,高可用是最主要的需求。如何保持高可用,Diamond持有多层的数据存储,数据被存储在:数据库,服务端磁盘,客户端缓存目录,以及可以手工干预的容灾目录。 客户端通过API获取配置数据按照固定的顺序去不同的数据源获取数据:容灾目录,服务端磁盘,客户端缓存。
Diamond除了在容灾上做了很多方案,在数据读取方面也有很多特点。客户端采用推拉结合的策略在长连接和短连接之间取得一个平衡,让服务端不用太关注连接的管理,又可以获得长连接的及时性。
使用Diamond的流程:
具体客户端在做什么事情呢?如何发布配置,如何读取配置,这都是客户端要做的事情。我们具体先看看客户端的细节。
客户端实现的细节都是static方法,也就是说DiamondClient启动时是在类加载阶段就完成了。在Diamond.java里有这样的代码,这个接口表名要发布一个配置:
static public boolean publishSingle(String dataId, String group, String content) {return defaultEnv.publishSingle(dataId, group, content);}
其中defaultEnv是一个DiamondEnv类,在类加载时初始化。
static public final DiamondEnv defaultEnv = new DiamondEnv(new ServerListManager());
ServerListManager是一个启动时和运行时定期获取地址列表的类。启动时拿不到地址列表,进程退出。获取列表主要是通过启动了一个GetServerListTask,这个Task会每隔一段时间去访问diamond服务器,获取配置信息。具体调度是通过ScheduledExecutorService来实现。
class GetServerListTask implements Runnable {final String url;GetServerListTask(String url) {this.url = url;}@Overridepublic void run() {try {updateIfChanged(getApacheServerList(url));} catch (Exception e) {log.error("[serverlist] failed to get serverlist, " + e.toString(), e);}}}
其中getApacheServerList负责去http get服务器http://xxx/server/diamond,来获取服务地址。updateIfChanged负责把地址存到serverUrls里并更新到本地文件。除了启动GetServerListTask,DiamondEnv还初始化了一个ClientWorker,这个worker会启动一个调度线程不断的check配置信息,具体见下:
ClientWorker(final DiamondEnv env) {this.env = env;executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.taobao.diamond.client.Worker."+ env.serverMgr.name);t.setDaemon(true);return t;}});executor.scheduleWithFixedDelay(new Runnable() {public void run() {try {checkLocalConfigInfo();checkServerConfigInfo();} catch (Throwable e) {log.error("[sub-error-rotate] rotate check error", e);}}}, 1L, 1L, TimeUnit.MILLISECONDS);}
check的过程分两步,check本地和check服务端。本地check检查本地容灾目录是否有对应的配置内容文件,如果有,把本地容灾读取开关打开(这个开关会控制diamond客户端优先读哪个地方);如果没有,就把开关关掉;如果有但是和内存中持有的文件lastModified时间戳不一致,这时候把内存变量的内容更新。check服务端,会访问服务端,获取后,有一段更新本地缓存的逻辑,这个后面会讲到。check本身是有前提的,如果有配置过CacheData,那么check才会进行,否则直接结束。而CacheData会在后面讲到。至此,diamond客户端的初始化工作完成。
如上面提到,发布配置就是publishSingle方法,内容很简单,就是发一个http post请求到diamond server。代码如下:
String url = "/diamond_publish.do?method=syncUpdateAll";List<String> params = Arrays.asList("dataId", dataId, "group", group, "content", content);HttpResult result = null;try {result = agent.httpPost(url, null, params, Constants.ENCODE, POST_TIMEOUT);} catch (IOException ioe) {log.warn("[publish-single] error, " + dataId + ", " + group + ", msg: "+ ioe.toString());return false;}
这里有一些优化,agent是一个ServerHttpAgent,post方法是定制的,会从ServerListManager将server的地址排序,依次post,做到同机房优先。
获取配置通过Diamond.getConfig接口来完成,这个接口支持默认获取顺序,也支持自定义其他顺序,读取顺序是这样的,优先读本地容灾->读server->读本地缓存,代码如下:
// 优先使用本地配置String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);if (content != null) {log.warn("[get-config] get failover ok, dataId=" + dataId + ", group=" + group+ ", config=" + ContentUtils.truncateContent(content));return content;}try {return ClientWorker.getServerConfig(this, dataId, group, timeoutMs);} catch (IOException ioe) {log.warn("[get-config] get server error, dataId:" + dataId + ", group:" + group+ ", " + ioe.toString());}log.warn("[get-config] get snapshot, dataId:" + dataId + ", group:" + group);return LocalConfigInfoProcessor.getSnapshot(this, dataId, group);
在ClientWorker获取服务端配置时,有一个注意点,在发送/diamond_config调用的时候,有这样一段逻辑:
switch (result.code) {case HttpURLConnection.HTTP_OK:// if (env == defaultEnv) {LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, result.content);// }return result.content;case HttpURLConnection.HTTP_NOT_FOUND:// if (env == defaultEnv) {LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, null);// }return null;case HttpURLConnection.HTTP_CONFLICT: {log.warn("[sub-server-error] data being modified");throw new IOException("data being modified");}
这意味着,在服务端获取后,会更新本地缓存,同理的是如果获取不到也会清除本地缓存。这个就和初始化阶段的定时调度check任务联系起来。
Diamond还支持的一个功能是监听变更,可以注册一个配置变更监听器,当服务端配置变更后,可以实现回调。具体接口:
static public void addListener(String dataId, String group, ManagerListener listener)static public void removeListener(String dataId, String group, ManagerListener listener)
添加时会用到CacheData:
/*** 查询CacheData,不存在时新增。*/public CacheData addCacheDataIfAbsent(String dataId, String group) {CacheData cache = getCache(dataId, group);if (null != cache) {return cache;}synchronized (cacheMap) {String key = GroupKey.getKey(dataId, group);cache = new CacheData(dataId, group);Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());copy.put(key, cache);cacheMap.set(copy);log.info("[subscribe] " + key);}String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);content = (null != content) ? content //: LocalConfigInfoProcessor.getSnapshot(this, dataId, group);cache.setContent(content);return cache;}
CacheData很重要,因为listener就是加在CacheData上的。listener是实现ManagerListener接口的,接口信息:
public interface ManagerListener {public Executor getExecutor();/*** 接收配置信息** @param configInfo*/public void receiveConfigInfo(final String configInfo);}
listener的执行是在初始化步骤提到的check阶段做的,在check本地和服务端都会调用checkListenerMd5(env)方法,而这个方法就是检查变更后通知listener执行receiveConfigInfo方法。
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, md5, wrap);}}}static void safeNotifyListener(final String dataId, final String group, final String content,final String md5, ManagerListenerWrap listenerWrap) {final ManagerListener listener = listenerWrap.listener;listenerWrap.lastCallMd5 = md5;Runnable job = new Runnable() {public void run() {try {listener.receiveConfigInfo(content);log.info("[notify-ok] " + dataId + ", " + group + ", md5=" + md5+ ", listener=" + listener);} catch (Throwable t) {log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5+ ", listener=" + listener.toString(), t);}}};try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5 + ", listener="+ listener.toString(), t);}}
可以看到检查变更就是检查md5签名是否一致,不一致就说明配置有变更,这时就通知listener执行。
最后汇总一下,client端涉及到的几个连接server的地方。客户端要做的事情就讲完了。值得一提的是每次获取服务的http调用,在header里都是要set一个appkey和appsecret作为token校验的。当然这应该是http服务的标配。
| 连接 | 用途 |
|---|---|
| xxx/server/diamond | 获取服务地址 |
| ip/server/diamond_publish?method=syncUpdateAll | 发布配置 |
| ip/server/diamond_config | 拉取配置 |
ip就是通过获取服务地址获取到的服务器地址,因为diamond本身也是无单点的架构,部署在不同机房,获取服务地址就是为了方便发布和拉取时能够优选机房。
Diamond的server端是一个java web项目,部署了一个java开发的web系统。对于diamond请求,我没有找到对应的实现代码。我猜想这个服务部署在另一个系统里,提供diamond server的服务地址查询服务,我本地访问了一下,会返回一个server的ip列表。而diamond_pulish的请求,被一个servlet处理,参数是dataId、group和配置的内容content。处理servlet做的事情——把数据插入数据库。
如果是获取请求,首先有个加读锁的动作,diamond servlet会给这个读请求申请加读锁,读写锁的实现是SimpleReadWriteLock类,里面通过定义一个int型的status来判断读写锁。拿到锁后,从本地文件中把配置读出来,返回响应请求。
等下,这里好像出现问题了,发布的时候把配置写到数据库,怎么读的时候反而读文件了呢?原来在服务器启动时,也有很多service带着守护线程启动了,它们随时监控着系统的情况。比如PersistService,启动时定时的去检查数据库的读取情况,不间断的运行一个SelectMasterTask,去选择master数据库。选择的过程就是从服务器配置的若干个数据源里选择主库(方法是执行一条delete,能做就是master,因为备库是readonly)。
class SelectMasterTask implements Runnable {public void run() {defaultLog.info("check master db.");boolean isFound = false;for (BasicDataSource ds : dataSourceList) {testMasterJT.setDataSource(ds);testMasterJT.setQueryTimeout(QUERY_TIMEOUT);try {testMasterJT.update("delete from config_info where data_id='com.taobao.diamond.testMasterDB'");if (jt.getDataSource() != ds) {fatalLog.warn("[master-db] {}", ds.getUrl());}jt.setDataSource(ds);isFound = true;break;} catch (DataAccessException e) { // read onlye.printStackTrace(); // TODO remove}}if (!isFound) {fatalLog.error("[master-db] master db not found.");}}}
比如DumpService,启动时初始化了一个TaskManager,运行ProcessRunnable任务,这个任务就是让TaskManager.process()。
protected void process() {for (Map.Entry<String, Task> entry : this.tasks.entrySet()) {Task task = null;this.lock.lock();try {// 获取任务task = entry.getValue();if (null != task) {if (!task.shouldProcess()) {// 任务当前不需要被执行,直接跳过continue;}// 先将任务从任务Map中删除this.tasks.remove(entry.getKey());}}finally {this.lock.unlock();}if (null != task) {// 获取任务处理器TaskProcessor processor = this.taskProcessors.get(entry.getKey());if (null == processor) {// 如果没有根据任务类型设置的处理器,使用默认处理器processor = this.getDefaultTaskProcessor();}if (null != processor) {boolean result = false;try {// 处理任务result = processor.process(entry.getKey(), task);}catch (Throwable t) {log.error("处理task失败", t);}if (!result) {// 任务处理失败,设置最后处理时间task.setLastProcessTime(System.currentTimeMillis());// 将任务重新加入到任务Map中this.addTask(entry.getKey(), task);}}}}if (tasks.isEmpty()) {this.lock.lock();try {this.notEmpty.signalAll();}finally {this.lock.unlock();}}}
就是调度任务,去执行任务表里的所有任务。看看DumpService的构造函数:
@Autowiredpublic DumpService(PersistService persistService) {DiskUtil.clearAll();this.persistService = persistService;DumpProcessor processor = new DumpProcessor(this);DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);dumpTaskMgr = new TaskManager("com.taobao.diamond.server.DumpTaskManager");dumpTaskMgr.setDefaultTaskProcessor(processor);dumpTaskMgr.addProcessor(DumpAllTask.taskId, dumpAllProcessor);Runnable dumpAll = new Runnable() {@Overridepublic void run() {dumpTaskMgr.addTask(DumpAllTask.taskId, new DumpAllTask());}};TimerTaskService.scheduleWithFixedDelay(dumpAll, dumpAllIntervalInHour,dumpAllIntervalInHour, TimeUnit.HOURS);// initial dump alldumpAllProcessor.process(DumpAllTask.taskId, new DumpAllTask());}
Task表里的任务都是dumpAll加进去的,就是一个任务——dumpAll——把数据库里所有的配置记录都dump到磁盘。
还有形如MergeDatumService负责把所有同类可聚合的配置聚合起来。值得一提的是NotifyService,看下它的NotifyTaskProcessor的process方法:
@Overridepublic boolean process(String taskType, Task task) {NotifyTask notifyTask = (NotifyTask) task;String dataId = notifyTask.getDataId();String group = notifyTask.getGroup();long lastModified = notifyTask.getLastModified();boolean isok = true;for (String ip : serverListService.getServerList()) {isok = notifyToDump(dataId, group,lastModified, ip) && isok;}return isok;}
里面的notifyToDump方法就是像其他的diamond server发形如diamond_notify.do?method=xxx&dataId=xxx&group=xxx的请求。而server对于这类请求的servlet处理是启动DumpService的dump方法,该方法就是将DumpTask加到dumpTaskMgr里,刚才上面的代码已经看到了,dumpTaskMgr添加了两个processor,一个负责dumpAll,一个负责dump。dump做的事情就是从数据库里查询dataId对应的content,然后写到本地。
这里一个设计我觉得可以抽出来,就是守护线程和task的设计:
Diamond这样的场景应该是所有业务开发过程中都会遇到的需求场景。和Zookeeper有很大程度的功能重合。但是实现上可以看到Diamond有很多限制,读写一致性无法保证,因为写server写的是DB,而读server却读的是文件,而文件的dump依赖notify task和dump task。这都是有守护线程完成,有一定的时延。