[关闭]
@linwbai 2018-08-03T08:09:21.000000Z 字数 3716 阅读 853

Eureka 批量复制

eureka


发起请求

EurekaBootStrap#contextInitialized >> initEurekaServerContext >> DefaultEurekaServerContext#initialize >> PeerEurekaNodes#start >> updatePeerEurekaNodes >> createPeerEurekaNode >> new PeerEurekaNode >> TaskDispatchers#createBatchingTaskDispatcher >> TaskExecutors#batchExecutors >> new BatchWorkerRunnable >> run >> getWork、ReplicationTaskProcessor#process、AcceptorExecutor#reprocess

  1. //resolvePeerUrls方法为获取集群中所有serviceUrl,不包括自身
  2. updatePeerEurekaNodes(resolvePeerUrls());
  3. Runnable peersUpdateTask = () -> {
  4. updatePeerEurekaNodes(resolvePeerUrls());
  5. };
  6. taskExecutor.scheduleWithFixedDelay(peersUpdateTask,..,..,..);

updatePeerEurekaNodes方法中和原来或者上次更新的peerEurekaNodeUrls比较,移除异常的节点PeerEurekaNode#shutDown。添加新增的节点this#createPeerEurekaNode 创建一个新的PeerEurekaNode。

创建PeerEurekaNode节点的过程中会创建一个AcceptorRunner守护线程

  1. TaskHolder(ID id, T task, long expiryTime) {
  2. this.id = id;
  3. //延迟执行时间
  4. this.expiryTime = expiryTime;
  5. this.task = task;
  6. //对象创建时间
  7. this.submitTimestamp = System.currentTimeMillis();
  8. }
  9. /**
  10. * reprocessQueue 再次处理队列
  11. * acceptorQueue 存放接收到任务的队列
  12. * pendingTasks 未执行任务map
  13. * batchWorkQueue 批量执行任务队列
  14. * processingOrder处理任务的顺序队列
  15. */
  16. long scheduleTime = 0;
  17. while (!isShutdown.get()) {
  18. try {
  19. //reprocessQueue和acceptorQueue中的TaskHolder放入任务队列processingOrder中。 其中如果是reprocessQueue中的如果满足expiryTime大于当前时间并且不在pendingTasks中则放在processingOrder的头部
  20. drainInputQueues();
  21. int totalItems = processingOrder.size();
  22. long now = System.currentTimeMillis();
  23. if (scheduleTime < now) {
  24. scheduleTime = now + trafficShaper.transmissionDelay();
  25. }
  26. if (scheduleTime <= now) {
  27. //把processingOrder队列中的任务转到batchWorkQueue队列中去
  28. assignBatchWork();
  29. assignSingleItemWork();
  30. }
  31. // If no worker is requesting data or there is a delay injected by the traffic shaper,
  32. // sleep for some time to avoid tight loop.
  33. if (totalItems == processingOrder.size()) {
  34. Thread.sleep(10);
  35. }
  36. } catch (InterruptedException ex) {
  37. } catch (Throwable e) {
  38. }
  39. }
  1. //是否满足批量任务处理
  2. if (hasEnoughTasksForNextBatch()) {
  3. ...
  4. }
  5. //正在处理的任务队列为空
  6. if (processingOrder.isEmpty()) {
  7. return false;
  8. }
  9. //未执行的任务大小 >= 最大缓存大小 则返回true 默认10000参数为maxElementsInPeerReplicationPool
  10. if (pendingTasks.size() >= maxBufferSize) {
  11. return true;
  12. }
  13. //顺序执行队列中第一个TaskHolder存在的时间如果大于maxBatchingDelay 则满足批量任务处理的条件 maxBatchingDelay固定为 500 PeerEurekaNode.MAX_BATCHING_DELAY_MS
  14. TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
  15. long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
  16. return delay >= maxBatchingDelay;
  1. while (!isShutdown.get()) {
  2. //从batchWorkQueue中获取List<TaskHolder>
  3. List<TaskHolder<ID, T>> holders = getWork();
  4. metrics.registerExpiryTimes(holders);
  5. List<T> tasks = getTasksOf(holders);
  6. //执行复制逻辑 http请求其他节点的 peerreplication/batch/ 接口
  7. ProcessingResult result = processor.process(tasks);
  8. switch (result) {
  9. case Success:
  10. break;
  11. case Congestion:
  12. case TransientError:
  13. //失败的话放入reprocessQueue队列
  14. taskDispatcher.reprocess(holders, result);
  15. break;
  16. }
  17. ...
  18. }
  1. //getWork方法
  2. //workQueue就是batchWorkQueue
  3. do {
  4. result = workQueue.poll(1, TimeUnit.SECONDS);
  5. } while (!isShutdown.get() && result == null);

接受请求

  1. //ReplicationInstance.java
  2. private String appName;
  3. private String providerId;
  4. private String id;
  5. private Long lastDirtyTimestamp;
  6. private String overriddenStatus;
  7. private String status;
  8. private InstanceInfo instanceInfo;
  9. private Action action;
  1. //dispatch方法根据ReplicationInstance的操作类型进行不同的处理
  2. switch (instanceInfo.getAction()) {
  3. case Register:
  4. handleRegister(instanceInfo, applicationResource);
  5. break;
  6. case Heartbeat:
  7. handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
  8. break;
  9. case Cancel:
  10. handleCancel(resource);
  11. break;
  12. case StatusUpdate:
  13. handleStatusUpdate(instanceInfo, resource);
  14. break;
  15. case DeleteStatusOverride:
  16. handleDeleteStatusOverride(instanceInfo, resource);
  17. break;
  18. }

我们看一下比较复杂Heartbeat
java

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注