@changedi
2016-01-11T13:32:27.000000Z
字数 10346
阅读 6111
Java
Diamond主要提供持久配置的发布和订阅服务,最大特点是结构简单,稳定可靠。主要的使用场景:TDDL使用Diamond动态切换数据库,动态扩容等;业务使用Diamond推送系统开关配置。Diamond产品专注于高可用性,基于此在架构、容灾机制、数据获取模型上有一些与同类产品的不同之处
——阿里巴巴Diamond介绍
Diamond是无单点架构,在做更新配置的时候只做三件事:
本地的设计就是为了缓存,减少对数据库的压力。作为一个配置中心,高可用是最主要的需求。如何保持高可用,Diamond持有多层的数据存储,数据被存储在:数据库,服务端磁盘,客户端缓存目录,以及可以手工干预
的容灾目录。 客户端通过API获取配置数据按照固定的顺序去不同的数据源获取数据:容灾目录,服务端磁盘,客户端缓存。
Diamond除了在容灾上做了很多方案,在数据读取方面也有很多特点。客户端采用推拉结合的策略在长连接和短连接之间取得一个平衡,让服务端不用太关注连接的管理,又可以获得长连接的及时性。
使用Diamond的流程:
具体客户端在做什么事情呢?如何发布配置,如何读取配置,这都是客户端要做的事情。我们具体先看看客户端的细节。
客户端实现的细节都是static方法,也就是说DiamondClient启动时是在类加载阶段就完成了。在Diamond.java里有这样的代码,这个接口表名要发布一个配置:
static public boolean publishSingle(String dataId, String group, String content) {
return defaultEnv.publishSingle(dataId, group, content);
}
其中defaultEnv是一个DiamondEnv类,在类加载时初始化。
static public final DiamondEnv defaultEnv = new DiamondEnv(new ServerListManager());
ServerListManager是一个启动时和运行时定期获取地址列表的类。启动时拿不到地址列表,进程退出。获取列表主要是通过启动了一个GetServerListTask,这个Task会每隔一段时间去访问diamond服务器,获取配置信息。具体调度是通过ScheduledExecutorService来实现。
class GetServerListTask implements Runnable {
final String url;
GetServerListTask(String url) {
this.url = url;
}
@Override
public void run() {
try {
updateIfChanged(getApacheServerList(url));
} catch (Exception e) {
log.error("[serverlist] failed to get serverlist, " + e.toString(), e);
}
}
}
其中getApacheServerList负责去http get服务器http://xxx/server/diamond,来获取服务地址。updateIfChanged负责把地址存到serverUrls里并更新到本地文件。除了启动GetServerListTask,DiamondEnv还初始化了一个ClientWorker,这个worker会启动一个调度线程不断的check配置信息,具体见下:
ClientWorker(final DiamondEnv env) {
this.env = env;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.taobao.diamond.client.Worker."+ env.serverMgr.name);
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkLocalConfigInfo();
checkServerConfigInfo();
} catch (Throwable e) {
log.error("[sub-error-rotate] rotate check error", e);
}
}
}, 1L, 1L, TimeUnit.MILLISECONDS);
}
check的过程分两步,check本地和check服务端。本地check检查本地容灾
目录是否有对应的配置内容文件,如果有,把本地容灾读取开关打开(这个开关会控制diamond客户端优先读哪个地方);如果没有,就把开关关掉;如果有但是和内存中持有的文件lastModified时间戳不一致,这时候把内存变量的内容更新。check服务端,会访问服务端,获取后,有一段更新本地缓存
的逻辑,这个后面会讲到。check本身是有前提的,如果有配置过CacheData,那么check才会进行,否则直接结束。而CacheData会在后面讲到。至此,diamond客户端的初始化工作完成。
如上面提到,发布配置就是publishSingle方法,内容很简单,就是发一个http post请求到diamond server。代码如下:
String url = "/diamond_publish.do?method=syncUpdateAll";
List<String> params = Arrays.asList("dataId", dataId, "group", group, "content", content);
HttpResult result = null;
try {
result = agent.httpPost(url, null, params, Constants.ENCODE, POST_TIMEOUT);
} catch (IOException ioe) {
log.warn("[publish-single] error, " + dataId + ", " + group + ", msg: "
+ ioe.toString());
return false;
}
这里有一些优化,agent是一个ServerHttpAgent,post方法是定制的,会从ServerListManager将server的地址排序,依次post,做到同机房优先。
获取配置通过Diamond.getConfig接口来完成,这个接口支持默认获取顺序,也支持自定义其他顺序,读取顺序是这样的,优先读本地容灾
->读server
->读本地缓存
,代码如下:
// 优先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);
if (content != null) {
log.warn("[get-config] get failover ok, dataId=" + dataId + ", group=" + group
+ ", config=" + ContentUtils.truncateContent(content));
return content;
}
try {
return ClientWorker.getServerConfig(this, dataId, group, timeoutMs);
} catch (IOException ioe) {
log.warn("[get-config] get server error, dataId:" + dataId + ", group:" + group
+ ", " + ioe.toString());
}
log.warn("[get-config] get snapshot, dataId:" + dataId + ", group:" + group);
return LocalConfigInfoProcessor.getSnapshot(this, dataId, group);
在ClientWorker获取服务端配置时,有一个注意点,在发送/diamond_config调用的时候,有这样一段逻辑:
switch (result.code) {
case HttpURLConnection.HTTP_OK:
// if (env == defaultEnv) {
LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, result.content);
// }
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
// if (env == defaultEnv) {
LocalConfigInfoProcessor.saveSnapshot(env, dataId, group, null);
// }
return null;
case HttpURLConnection.HTTP_CONFLICT: {
log.warn("[sub-server-error] data being modified");
throw new IOException("data being modified");
}
这意味着,在服务端获取后,会更新本地缓存
,同理的是如果获取不到也会清除本地缓存
。这个就和初始化阶段的定时调度check任务联系起来。
Diamond还支持的一个功能是监听变更,可以注册一个配置变更监听器,当服务端配置变更后,可以实现回调。具体接口:
static public void addListener(String dataId, String group, ManagerListener listener)
static public void removeListener(String dataId, String group, ManagerListener listener)
添加时会用到CacheData:
/**
* 查询CacheData,不存在时新增。
*/
public CacheData addCacheDataIfAbsent(String dataId, String group) {
CacheData cache = getCache(dataId, group);
if (null != cache) {
return cache;
}
synchronized (cacheMap) {
String key = GroupKey.getKey(dataId, group);
cache = new CacheData(dataId, group);
Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
copy.put(key, cache);
cacheMap.set(copy);
log.info("[subscribe] " + key);
}
String content = LocalConfigInfoProcessor.getFailover(this, dataId, group);
content = (null != content) ? content //
: LocalConfigInfoProcessor.getSnapshot(this, dataId, group);
cache.setContent(content);
return cache;
}
CacheData很重要,因为listener就是加在CacheData上的。listener是实现ManagerListener接口的,接口信息:
public interface ManagerListener {
public Executor getExecutor();
/**
* 接收配置信息
*
* @param configInfo
*/
public void receiveConfigInfo(final String configInfo);
}
listener的执行是在初始化步骤提到的check阶段做的,在check本地和服务端都会调用checkListenerMd5(env)
方法,而这个方法就是检查变更后通知listener执行receiveConfigInfo方法。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
static void safeNotifyListener(final String dataId, final String group, final String content,
final String md5, ManagerListenerWrap listenerWrap) {
final ManagerListener listener = listenerWrap.listener;
listenerWrap.lastCallMd5 = md5;
Runnable job = new Runnable() {
public void run() {
try {
listener.receiveConfigInfo(content);
log.info("[notify-ok] " + dataId + ", " + group + ", md5=" + md5
+ ", listener=" + listener);
} catch (Throwable t) {
log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5
+ ", listener=" + listener.toString(), t);
}
}
};
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
log.error("[notify-error] " + dataId + ", " + group + ", md5=" + md5 + ", listener="
+ listener.toString(), t);
}
}
可以看到检查变更就是检查md5签名是否一致,不一致就说明配置有变更,这时就通知listener执行。
最后汇总一下,client端涉及到的几个连接server的地方。客户端要做的事情就讲完了。值得一提的是每次获取服务的http调用,在header里都是要set一个appkey和appsecret作为token校验的。当然这应该是http服务的标配。
连接 | 用途 |
---|---|
xxx/server/diamond | 获取服务地址 |
ip/server/diamond_publish?method=syncUpdateAll | 发布配置 |
ip/server/diamond_config | 拉取配置 |
ip就是通过获取服务地址获取到的服务器地址,因为diamond本身也是无单点的架构,部署在不同机房,获取服务地址就是为了方便发布和拉取时能够优选机房。
Diamond的server端是一个java web项目,部署了一个java开发的web系统。对于diamond请求,我没有找到对应的实现代码。我猜想这个服务部署在另一个系统里,提供diamond server的服务地址查询服务,我本地访问了一下,会返回一个server的ip列表。而diamond_pulish的请求,被一个servlet处理,参数是dataId、group和配置的内容content。处理servlet做的事情——把数据插入数据库。
如果是获取请求,首先有个加读锁的动作,diamond servlet会给这个读请求申请加读锁,读写锁的实现是SimpleReadWriteLock类,里面通过定义一个int型的status来判断读写锁。拿到锁后,从本地文件中把配置读出来,返回响应请求。
等下,这里好像出现问题了,发布的时候把配置写到数据库,怎么读的时候反而读文件了呢?原来在服务器启动时,也有很多service带着守护线程启动了,它们随时监控着系统的情况。比如PersistService,启动时定时的去检查数据库的读取情况,不间断的运行一个SelectMasterTask,去选择master数据库。选择的过程就是从服务器配置的若干个数据源里选择主库(方法是执行一条delete,能做就是master,因为备库是readonly)。
class SelectMasterTask implements Runnable {
public void run() {
defaultLog.info("check master db.");
boolean isFound = false;
for (BasicDataSource ds : dataSourceList) {
testMasterJT.setDataSource(ds);
testMasterJT.setQueryTimeout(QUERY_TIMEOUT);
try {
testMasterJT
.update("delete from config_info where data_id='com.taobao.diamond.testMasterDB'");
if (jt.getDataSource() != ds) {
fatalLog.warn("[master-db] {}", ds.getUrl());
}
jt.setDataSource(ds);
isFound = true;
break;
} catch (DataAccessException e) { // read only
e.printStackTrace(); // TODO remove
}
}
if (!isFound) {
fatalLog.error("[master-db] master db not found.");
}
}
}
比如DumpService,启动时初始化了一个TaskManager,运行ProcessRunnable任务,这个任务就是让TaskManager.process()。
protected void process() {
for (Map.Entry<String, Task> entry : this.tasks.entrySet()) {
Task task = null;
this.lock.lock();
try {
// 获取任务
task = entry.getValue();
if (null != task) {
if (!task.shouldProcess()) {
// 任务当前不需要被执行,直接跳过
continue;
}
// 先将任务从任务Map中删除
this.tasks.remove(entry.getKey());
}
}
finally {
this.lock.unlock();
}
if (null != task) {
// 获取任务处理器
TaskProcessor processor = this.taskProcessors.get(entry.getKey());
if (null == processor) {
// 如果没有根据任务类型设置的处理器,使用默认处理器
processor = this.getDefaultTaskProcessor();
}
if (null != processor) {
boolean result = false;
try {
// 处理任务
result = processor.process(entry.getKey(), task);
}
catch (Throwable t) {
log.error("处理task失败", t);
}
if (!result) {
// 任务处理失败,设置最后处理时间
task.setLastProcessTime(System.currentTimeMillis());
// 将任务重新加入到任务Map中
this.addTask(entry.getKey(), task);
}
}
}
}
if (tasks.isEmpty()) {
this.lock.lock();
try {
this.notEmpty.signalAll();
}
finally {
this.lock.unlock();
}
}
}
就是调度任务,去执行任务表里的所有任务。看看DumpService的构造函数:
@Autowired
public DumpService(PersistService persistService) {
DiskUtil.clearAll();
this.persistService = persistService;
DumpProcessor processor = new DumpProcessor(this);
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
dumpTaskMgr = new TaskManager("com.taobao.diamond.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);
dumpTaskMgr.addProcessor(DumpAllTask.taskId, dumpAllProcessor);
Runnable dumpAll = new Runnable() {
@Override
public void run() {
dumpTaskMgr.addTask(DumpAllTask.taskId, new DumpAllTask());
}
};
TimerTaskService.scheduleWithFixedDelay(dumpAll, dumpAllIntervalInHour,
dumpAllIntervalInHour, TimeUnit.HOURS);
// initial dump all
dumpAllProcessor.process(DumpAllTask.taskId, new DumpAllTask());
}
Task表里的任务都是dumpAll加进去的,就是一个任务——dumpAll——把数据库里所有的配置记录都dump到磁盘。
还有形如MergeDatumService负责把所有同类可聚合的配置聚合起来。值得一提的是NotifyService,看下它的NotifyTaskProcessor的process方法:
@Override
public boolean process(String taskType, Task task) {
NotifyTask notifyTask = (NotifyTask) task;
String dataId = notifyTask.getDataId();
String group = notifyTask.getGroup();
long lastModified = notifyTask.getLastModified();
boolean isok = true;
for (String ip : serverListService.getServerList()) {
isok = notifyToDump(dataId, group,lastModified, ip) && isok;
}
return isok;
}
里面的notifyToDump方法就是像其他的diamond server发形如diamond_notify.do?method=xxx&dataId=xxx&group=xxx的请求。而server对于这类请求的servlet处理是启动DumpService的dump方法,该方法就是将DumpTask加到dumpTaskMgr里,刚才上面的代码已经看到了,dumpTaskMgr添加了两个processor,一个负责dumpAll,一个负责dump。dump做的事情就是从数据库里查询dataId对应的content,然后写到本地。
这里一个设计我觉得可以抽出来,就是守护线程和task的设计:
Diamond这样的场景应该是所有业务开发过程中都会遇到的需求场景。和Zookeeper有很大程度的功能重合。但是实现上可以看到Diamond有很多限制,读写一致性无法保证,因为写server写的是DB,而读server却读的是文件,而文件的dump依赖notify task和dump task。这都是有守护线程完成,有一定的时延。