@linwbai
2018-08-03T08:09:21.000000Z
字数 3716
阅读 1065
eureka
EurekaBootStrap#contextInitialized >> initEurekaServerContext >> DefaultEurekaServerContext#initialize >> PeerEurekaNodes#start >> updatePeerEurekaNodes >> createPeerEurekaNode >> new PeerEurekaNode >> TaskDispatchers#createBatchingTaskDispatcher >> TaskExecutors#batchExecutors >> new BatchWorkerRunnable >> run >> getWork、ReplicationTaskProcessor#process、AcceptorExecutor#reprocess
//resolvePeerUrls方法为获取集群中所有serviceUrl,不包括自身updatePeerEurekaNodes(resolvePeerUrls());Runnable peersUpdateTask = () -> {updatePeerEurekaNodes(resolvePeerUrls());};taskExecutor.scheduleWithFixedDelay(peersUpdateTask,..,..,..);
updatePeerEurekaNodes方法中和原来或者上次更新的peerEurekaNodeUrls比较,移除异常的节点PeerEurekaNode#shutDown。添加新增的节点this#createPeerEurekaNode 创建一个新的PeerEurekaNode。
创建PeerEurekaNode节点的过程中会创建一个AcceptorRunner守护线程
TaskHolder(ID id, T task, long expiryTime) {this.id = id;//延迟执行时间this.expiryTime = expiryTime;this.task = task;//对象创建时间this.submitTimestamp = System.currentTimeMillis();}/*** reprocessQueue 再次处理队列* acceptorQueue 存放接收到任务的队列* pendingTasks 未执行任务map* batchWorkQueue 批量执行任务队列* processingOrder处理任务的顺序队列*/long scheduleTime = 0;while (!isShutdown.get()) {try {//reprocessQueue和acceptorQueue中的TaskHolder放入任务队列processingOrder中。 其中如果是reprocessQueue中的如果满足expiryTime大于当前时间并且不在pendingTasks中则放在processingOrder的头部drainInputQueues();int totalItems = processingOrder.size();long now = System.currentTimeMillis();if (scheduleTime < now) {scheduleTime = now + trafficShaper.transmissionDelay();}if (scheduleTime <= now) {//把processingOrder队列中的任务转到batchWorkQueue队列中去assignBatchWork();assignSingleItemWork();}// If no worker is requesting data or there is a delay injected by the traffic shaper,// sleep for some time to avoid tight loop.if (totalItems == processingOrder.size()) {Thread.sleep(10);}} catch (InterruptedException ex) {} catch (Throwable e) {}}
//是否满足批量任务处理if (hasEnoughTasksForNextBatch()) {...}//正在处理的任务队列为空if (processingOrder.isEmpty()) {return false;}//未执行的任务大小 >= 最大缓存大小 则返回true 默认10000参数为maxElementsInPeerReplicationPoolif (pendingTasks.size() >= maxBufferSize) {return true;}//顺序执行队列中第一个TaskHolder存在的时间如果大于maxBatchingDelay 则满足批量任务处理的条件 maxBatchingDelay固定为 500 PeerEurekaNode.MAX_BATCHING_DELAY_MSTaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();return delay >= maxBatchingDelay;
List<TaskHolder>
while (!isShutdown.get()) {//从batchWorkQueue中获取List<TaskHolder>List<TaskHolder<ID, T>> holders = getWork();metrics.registerExpiryTimes(holders);List<T> tasks = getTasksOf(holders);//执行复制逻辑 http请求其他节点的 peerreplication/batch/ 接口ProcessingResult result = processor.process(tasks);switch (result) {case Success:break;case Congestion:case TransientError://失败的话放入reprocessQueue队列taskDispatcher.reprocess(holders, result);break;}...}
//getWork方法//workQueue就是batchWorkQueuedo {result = workQueue.poll(1, TimeUnit.SECONDS);} while (!isShutdown.get() && result == null);
//ReplicationInstance.javaprivate String appName;private String providerId;private String id;private Long lastDirtyTimestamp;private String overriddenStatus;private String status;private InstanceInfo instanceInfo;private Action action;
//dispatch方法根据ReplicationInstance的操作类型进行不同的处理switch (instanceInfo.getAction()) {case Register:handleRegister(instanceInfo, applicationResource);break;case Heartbeat:handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);break;case Cancel:handleCancel(resource);break;case StatusUpdate:handleStatusUpdate(instanceInfo, resource);break;case DeleteStatusOverride:handleDeleteStatusOverride(instanceInfo, resource);break;}
我们看一下比较复杂Heartbeat
java