[关闭]
@eric1989 2017-05-30T15:25:01.000000Z 字数 33773 阅读 1434

ForkJoinPool源码阅读

并发源码阅读


ForkJoinPool内容

externalPush

  1. final void externalPush(ForkJoinTask<?> task)
  2. {
  3. WorkQueue[] ws;
  4. WorkQueue q;
  5. int m;
  6. int r = ThreadLocalRandom.getProbe();
  7. int rs = runState;
  8. if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1))
  9. {
  10. ForkJoinTask<?>[] a;
  11. int am, n, s;
  12. if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base))
  13. {
  14. int j = ((am & s) << ASHIFT) + ABASE;
  15. U.putOrderedObject(a, j, task);
  16. U.putOrderedInt(q, QTOP, s + 1);
  17. U.putIntVolatile(q, QLOCK, 0);
  18. if (n <= 1)
  19. signalWork(ws, q);
  20. return;
  21. }
  22. U.compareAndSwapInt(q, QLOCK, 1, 0);
  23. }
  24. //全量的提交执行
  25. externalSubmit(task);
  26. }

externalSubmit

主要完成几个工作

  1. 如果整个Pool尚未初始化,则尝试cas竞争来初始化Pool。初始化内容是创建WQ数组
  2. 通过随机下标在共享WQ区域找到一个WQ,抢占其控制权,放入一个任务对象
  3. 如果随机下标的槽位为空,则创建一个共享的WQ,放入其中。
  1. private void externalSubmit(ForkJoinTask<?> task)
  2. {
  3. //获得线程内的随机数,这个没什么好讲的。
  4. int r; // initialize caller's probe
  5. if ((r = ThreadLocalRandom.getProbe()) == 0)
  6. {
  7. ThreadLocalRandom.localInit();
  8. r = ThreadLocalRandom.getProbe();
  9. }
  10. for (;;)
  11. {
  12. WorkQueue[] ws;
  13. WorkQueue q;
  14. int rs, m, k;
  15. boolean move = false;
  16. if ((rs = runState) < 0)
  17. {
  18. tryTerminate(false, false); // help terminate
  19. throw new RejectedExecutionException();
  20. }
  21. //如果发现
  22. //1.线程池尚未启动
  23. //2.工作队列尚未初始化
  24. //3.工作队列长度为0
  25. //三个条件任意存在,则尝试执行初始化工作
  26. else if ((rs & STARTED) == 0 || // initialize
  27. ((ws = workQueues) == null || (m = ws.length - 1) < 0))
  28. {
  29. int ns = 0;
  30. rs = lockRunState();
  31. try
  32. {
  33. if ((rs & STARTED) == 0)
  34. {
  35. U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong());
  36. // create workQueues array with size a power of two
  37. //config包含了ForkJoinPool的配置信息。是一个int数字。其中低16位bit用于表达系统整体的并发数。第17位bit用于表达模式。先进先出还是后进先出。先进先出是队列,后进先出是栈。
  38. int p = config & SMASK; // ensure at least 2 slots
  39. int n = (p > 1) ? p - 1 : 1;
  40. n |= n >>> 1;
  41. n |= n >>> 2;
  42. n |= n >>> 4;
  43. n |= n >>> 8;
  44. n |= n >>> 16;
  45. n = (n + 1) << 1;
  46. //上面的算法是一个固定的快速算法,目的是找到比n大的同时是2的次方幂的一个整数
  47. workQueues = new WorkQueue[n];
  48. ns = STARTED;
  49. }
  50. }
  51. finally
  52. {
  53. //初始化完成后取消锁定并且更新初始化标识
  54. unlockRunState(rs, (rs & ~RSLOCK) | ns);
  55. }
  56. }
  57. //经过上面的赋值,r是随机数,m是数组长度-1也就是对应的mask,SQMASK是共享WQ的长度范围
  58. //如果在对应的位置存在工作队列,则进入工作队列的竞争流程
  59. else if ((q = ws[k = r & m & SQMASK]) != null)
  60. {
  61. //竞争workqueue锁成功
  62. if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1))
  63. {
  64. ForkJoinTask<?>[] a = q.array;
  65. int s = q.top;
  66. boolean submitted = false; // initial submission or resizing
  67. try
  68. {
  69. //如果task数组不为空且未满
  70. //或者task数组扩容成功
  71. if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null)
  72. {
  73. //计算得到task在数组要放入的偏移量
  74. int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
  75. //只放入storestore屏障,这是为了性能考虑。
  76. U.putOrderedObject(a, j, task);
  77. U.putOrderedInt(q, QTOP, s + 1);
  78. submitted = true;
  79. }
  80. }
  81. finally
  82. {
  83. U.compareAndSwapInt(q, QLOCK, 1, 0);
  84. }
  85. if (submitted)
  86. {
  87. signalWork(ws, q);
  88. return;
  89. }
  90. }
  91. move = true; // move on failure
  92. }
  93. //如果队列不存在,并且整个Pool的锁尚未被持有。则尝试初始化一个工作队列
  94. else if (((rs = runState) & RSLOCK) == 0)
  95. { // create new queue
  96. //创建了一个用于共享的Workqueue
  97. q = new WorkQueue(this, null);
  98. q.hint = r;
  99. //k是上面流程发现的在workqueues中为null的slot下标,workQueue内的config意味着当前的workQueue的下标是工作模式。是独占,还是共享等。
  100. q.config = k | SHARED_QUEUE;
  101. q.scanState = INACTIVE;
  102. rs = lockRunState(); // publish index
  103. //rs大于0意味着线程池么有停止,剩余的条件均是判断对应的slot位置仍然是null
  104. if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null)
  105. ws[k] = q; // else terminated
  106. unlockRunState(rs, rs & ~RSLOCK);
  107. }
  108. else
  109. move = true; // move if busy
  110. //这个写法看起来容易和上面混在一起。其实已经是一个新的if开头了。如果move为true,就意味着需要切换更新线程的随机值了
  111. if (move)
  112. r = ThreadLocalRandom.advanceProbe(r);
  113. }
  114. }

signalWork(WorkQueue[] ws, WorkQueue q)

尝试唤醒处于等待堆栈栈顶的工作线程来执行任务。如果没有空闲线程且整体线程数不足,则增加工作线程。

  1. final void signalWork(WorkQueue[] ws, WorkQueue q) {
  2. long c; int sp, i; WorkQueue v; Thread p;
  3. //ctl小于0就是AC部分小于0,意味着没有足够的激活中的执行资源
  4. while ((c = ctl) < 0L) { // too few active
  5. //低32位为0,意味着当前没有正在等待执行的线程
  6. if ((sp = (int)c) == 0) { // no idle workers
  7. //(c & ADD_WORKER)!=0 推断出TC小于0,意味着整体执行资源不足。
  8. if ((c & ADD_WORKER) != 0L) // too few workers
  9. tryAddWorker(c);
  10. break;
  11. }
  12. if (ws == null) // unstarted/terminated
  13. break;
  14. //如果wq数组的长度小于等待线程持有的wq的下标,意味着pool已经终止了
  15. if (ws.length <= (i = sp & SMASK)) // terminated
  16. break;
  17. //如果等待线程持有的wq的下标在Pool的WQ数组中已经为空,意味着Pool开始执行终止流程了。
  18. if ((v = ws[i]) == null) // terminating
  19. break;
  20. //生成等待线程的下一个scanState的值。增加SS_SEQ的值为是为了保证每一次scanState都在变化,用于版本控制。避免CAS中的ABA问题。
  21. int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
  22. //d==0,意味着到这一步,之前的处于等待的worker的状态没有变化
  23. int d = sp - v.scanState; // screen CAS
  24. //只有先成功更新ctl才能更新自身的scanState,每个地方都遵循这个顺序
  25. //增加AC的数值,同时将SP部分恢复到上一个等待线程的数据
  26. long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
  27. if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
  28. v.scanState = vs; // activate v
  29. //如果该worker线程处于休眠状态,唤醒。
  30. if ((p = v.parker) != null)
  31. U.unpark(p);
  32. break;
  33. }
  34. //任务是放在q这个WQ里面的。如果q还有任务,意味着还可以继续唤醒等待线程,因为存在可以偷取的任务。否则的话,可能没有任务可以偷取了,直接退出循环
  35. if (q != null && q.base == q.top) // no more work
  36. break;
  37. }
  38. }

lockRunState

获得运行状态的锁。

  1. private int lockRunState()
  2. {
  3. int rs;
  4. return (
  5. (
  6. //如果当前运行状态的锁被其他线程得到
  7. ((rs = runState) & RSLOCK) != 0
  8. //或者运行状态没有锁但是竞争锁失败
  9. || !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)//
  10. ) ?
  11. //等待直到获取锁
  12. awaitRunStateLock()
  13. //之前的步骤成功得到锁,直接返回结果
  14. : rs
  15. );
  16. }

awaitRunStateLock

尝试竞争运行锁,直到成功

  1. private int awaitRunStateLock()
  2. {
  3. Object lock;
  4. boolean wasInterrupted = false;
  5. for (int spins = SPINS, r = 0, rs, ns;;)
  6. {
  7. //在运行锁没有被其他线程得到的情况尝试cas竞争锁
  8. if (((rs = runState) & RSLOCK) == 0)
  9. {
  10. if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK))
  11. {
  12. if (wasInterrupted)
  13. {
  14. try
  15. {
  16. Thread.currentThread().interrupt();
  17. }
  18. catch (SecurityException ignore)
  19. {
  20. }
  21. }
  22. return ns;
  23. }
  24. }
  25. //将r初始化为随机数种子
  26. else if (r == 0)
  27. r = ThreadLocalRandom.nextSecondarySeed();
  28. //如果spin大于0对r进行随机数演算。
  29. else if (spins > 0)
  30. {
  31. r ^= r << 6;
  32. r ^= r >>> 21;
  33. r ^= r << 7; // xorshift
  34. //根据随机数结果对spin进行递减操作
  35. if (r >= 0)
  36. --spins;
  37. }
  38. //如果在上面的步骤都完成的情况下,并且forkjoinpool还没有初始化意味着较高的竞争,主动让出cpu
  39. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
  40. Thread.yield(); // initialization race
  41. //如果走到这里意味着线程发现线程池已经初始化了,但是本线程竞争运行锁失败,此时在运行字段上cas放入需要通知的标识。
  42. else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL))
  43. {
  44. //如果成功,就以stealCounter为对象锁执行等待。
  45. synchronized (lock)
  46. {
  47. //如果通知标识存在,则线程阻塞等待其他线程完成工作
  48. if ((runState & RSIGNAL) != 0)
  49. {
  50. try
  51. {
  52. lock.wait();
  53. }
  54. catch (InterruptedException ie)
  55. {
  56. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread))
  57. wasInterrupted = true;
  58. }
  59. }
  60. //否则唤醒所有在这个对象锁上执行等待的线程
  61. else
  62. lock.notifyAll();
  63. }
  64. }
  65. }
  66. }

unlockRunState

解锁运行锁。如果发现通知标识,则复位该标识并且进行线程唤醒

  1. private void unlockRunState(int oldRunState, int newRunState)
  2. {
  3. //如果cas失败意味着在该cas之前或者同时,运行锁被设置了通知标识
  4. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState))
  5. {
  6. Object lock = stealCounter;
  7. //直接清除运行锁上的通知标识
  8. //这里不需要通过cas。因为当runState被更新后,其他线程想要放上通知标识是通过cas完成,而由于其他线程的预期值都是锁定的状态,与新值不符,都会失败。
  9. //所以这里直接更新是ok的
  10. runState = newRunState; // clears RSIGNAL bit
  11. //更新完成之后,由于需要获得lock的sync锁,因此无论何种时序,都可以处于等待状态的线程恢复过来。
  12. if (lock != null)
  13. synchronized (lock)
  14. {
  15. lock.notifyAll();
  16. }
  17. }
  18. }

tryAddWorker(long c)

不断尝试增加一个worker线程直到以下情况的发生
+ 当前Pool正在尝试终止
+ 增加成功
+ 当前激活worker数量超过配置的并发数并且没有处于等待的worker线程

  1. private void tryAddWorker(long c) {
  2. boolean add = false;
  3. do {
  4. //增加一个执行资源数。得到的计算结果就是nc。
  5. long nc = ((AC_MASK & (c + AC_UNIT)) |
  6. (TC_MASK & (c + TC_UNIT)));
  7. if (ctl == c) {
  8. int rs, stop; // check if terminating
  9. if ((stop = (rs = lockRunState()) & STOP) == 0)
  10. add = U.compareAndSwapLong(this, CTL, c, nc);
  11. unlockRunState(rs, rs & ~RSLOCK);
  12. if (stop != 0)
  13. break;
  14. //如果ctl更新成功,则确实的新增一个worker线程
  15. if (add) {
  16. //如果增加worker的意图成功,则进入创建环节。
  17. createWorker();
  18. break;
  19. }
  20. }
  21. //如果激活线程数到达或者超过配置的并发数,并且当前不存在处于等待的worker线程。则退出循环。
  22. } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
  23. }

createWorker()

创建一个worker线程。创建成功则返回true。创建失败则在pool中取消注册该worker,并且返回false

  1. private boolean createWorker() {
  2. ForkJoinWorkerThreadFactory fac = factory;
  3. Throwable ex = null;
  4. ForkJoinWorkerThread wt = null;
  5. try {
  6. if (fac != null && (wt = fac.newThread(this)) != null) {
  7. wt.start();
  8. return true;
  9. }
  10. } catch (Throwable rex) {
  11. ex = rex;
  12. }
  13. deregisterWorker(wt, ex);
  14. return false;
  15. }

registerWorker(ForkJoinWorkerThread wt)

为新生成的ForkJoinWorkerThread创建一个私有的非共享的WorkQueue。并且将该WQ放入到Pool的WQ数组中。该WQ的模式即为Pool的工作模式。

  1. final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
  2. UncaughtExceptionHandler handler;
  3. wt.setDaemon(true); // configure thread
  4. if ((handler = ueh) != null)
  5. wt.setUncaughtExceptionHandler(handler);
  6. WorkQueue w = new WorkQueue(this, wt);
  7. int i = 0; // assign a pool index
  8. int mode = config & MODE_MASK;
  9. int rs = lockRunState();
  10. try {
  11. WorkQueue[] ws; int n; // skip if no array
  12. if ((ws = workQueues) != null && (n = ws.length) > 0) {
  13. //得到一个初始化的数值。该数值的含义在尽可能避免碰撞
  14. int s = indexSeed += SEED_INCREMENT; // unlikely to collide
  15. int m = n - 1;
  16. //得到一个奇数下标
  17. i = ((s << 1) | 1) & m; // odd-numbered indices
  18. //如果发生了碰撞,则不断移动下标。下标的移动步长为数组长度的一半。
  19. if (ws[i] != null) { // collision
  20. int probes = 0;
  21. //step是大于或等于WQ数组长度的一半的最小偶数。
  22. int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
  23. while (ws[i = (i + step) & m] != null) {
  24. //不断尝试移动下标,如果接近2次完整的遍历都未曾发现空的槽位,则2倍扩容WQ数组
  25. if (++probes >= n) {
  26. workQueues = ws = Arrays.copyOf(ws, n <<= 1);
  27. m = n - 1;
  28. probes = 0;
  29. }
  30. }
  31. }
  32. //workQueue数组的大小受限于TC。也就是受限于并发数限制。并发数则限制在0x7fff内。
  33. w.hint = s; // use as random seed
  34. //当前workqueue的模式是workpool的模式。且非共享。
  35. w.config = i | mode;
  36. w.scanState = i; // publication fence
  37. ws[i] = w;
  38. }
  39. } finally {
  40. unlockRunState(rs, rs & ~RSLOCK);
  41. }
  42. wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
  43. return w;
  44. }

runWorker(WorkQueue w)

一个ForkjoinWorkerThread的主要循环。扫描WQ数组,偷取任务进行执行(有可能偷取的任务就是自身的任务)。没有任务时则进行等待。

  1. runWorker(WorkQueue w) {
  2. //对WQ执行初始化和适当的扩容。
  3. w.growArray(); // allocate queue
  4. int seed = w.hint; // initially holds randomization hint
  5. int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
  6. for (ForkJoinTask<?> t;;) {
  7. if ((t = scan(w, r)) != null)
  8. w.runTask(t);
  9. else if (!awaitWork(w, r))
  10. break;
  11. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
  12. }
  13. }

scan(WorkQueue w, int r)

扫描WQ数组并且尝试偷取一个任务。扫描起始于一个随机位置r,不断向前递增。如果一轮扫描中没有发现任何可以偷取的偷取,则尝试将自身设置为非激活状态,并且ctl的ac值-1。 如果再次扫描一轮,仍然处于非激活状态并且整个WQ数组状态稳定。则退出扫描并且返回null。
而如果扫描时发现了可以偷取的任务,自身处于激活状态则尝试偷取,否则尝试激活一个worker(可能是自己也可能是别人)。

  1. private ForkJoinTask<?> scan(WorkQueue w, int r) {
  2. WorkQueue[] ws; int m;
  3. if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
  4. int ss = w.scanState; // initially non-negative
  5. //计算初始随机位置origin
  6. for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
  7. WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
  8. int b, n; long c;
  9. //从Pool的WQ数组中挑选一个随机槽位上的WQ。该WQ不为空
  10. if ((q = ws[k]) != null) {
  11. //挑选的WQ对象,存在着可以偷取的任务(top-base大于0时意味着存在任务)
  12. if ((n = (b = q.base) - q.top) < 0 &&
  13. (a = q.array) != null) { // non-empty
  14. long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  15. //如果发现一个可以偷取的任务
  16. if ((t = ((ForkJoinTask<?>)
  17. U.getObjectVolatile(a, i))) != null &&
  18. q.base == b) {
  19. //读取到该WQ队列头部的task并且此时该队列的base尚未变化
  20. //如果当前worker处于激活状态则尝试偷取
  21. if (ss >= 0) {
  22. //将要偷取的task所在槽位cas方式置空。成功后则增加该WQ的base值并且返回偷取的任务。
  23. if (U.compareAndSwapObject(a, i, t, null)) {
  24. q.base = b + 1;
  25. //n<-1,意味着当前WQ内的待提取任务超过1个。因此可以尝试唤醒该worker线程
  26. if (n < -1)
  27. signalWork(ws, q);
  28. return t;
  29. }
  30. }
  31. //这个条件意味着本次扫描是当前worker设置为非激活状态成功后的第一轮扫描。此时发现了可以偷取的任务,则尝试激活一个worker(可以是自己或者是别人)
  32. else if (oldSum == 0 && // try to activate
  33. w.scanState < 0)
  34. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
  35. }
  36. //如果当前是非激活状态,则尝试刷新。有可能上面的操作中激活的是自身
  37. if (ss < 0) // refresh
  38. ss = w.scanState;
  39. //如果走到这里意味着发现了可以偷取的任务,但是没有偷取到。因此移动到一个新的随机初始值。并重新开始整个扫描流程
  40. r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
  41. origin = k = r & m; // move and rescan
  42. oldSum = checkSum = 0;
  43. continue;
  44. }
  45. //选中的WQ没有可以偷取任务时增加checkSum
  46. checkSum += b;
  47. }
  48. //让k+=1.如果循环完一个数组长度,则尝试执行一些操作
  49. if ((k = (k + 1) & m) == origin) {
  50. //ss>=0成立。意味着当前worker处于激活状态。
  51. //ss == (ss = w.scanState)意味着一轮扫描结束,worker的状态没有发生改变并且处于非激活状态
  52. //oldSum == (oldSum = checkSum) 意味着一轮扫描结束,整个WQ数组的内容没有发生改变
  53. //后两个条件同时为真,意味着连续的两轮扫描中,整个WQ数组状态稳定。
  54. if ((ss >= 0 || (ss == (ss = w.scanState))) &&
  55. oldSum == (oldSum = checkSum)) {
  56. //如果已经是非激活状态了,则直接退出
  57. if (ss < 0 || w.qlock < 0) // already inactive
  58. break;
  59. //将状态设置为负数,也就是非激活状态。
  60. int ns = ss | INACTIVE; // try to inactivate
  61. //设定新的ctl值。首先是AC部分减去1.然后是SP部分更新为当前worker的内容。
  62. long nc = ((SP_MASK & ns) |
  63. (UC_MASK & ((c = ctl) - AC_UNIT)));
  64. //当前的worker保留了上一个留存于SP部分的worker信息。
  65. w.stackPred = (int)c; // hold prev stack top
  66. //非volatile写的方式更新了scanState的值。因为下面还有cas操作,这里这样的更新操作可以节省性能。
  67. U.putInt(w, QSCANSTATE, ns);
  68. //如果cas方式更新ctl成功,则将ss设置为更新后的状态值,否则将scanState回退到更新前的状态值也就是ss的值。
  69. if (U.compareAndSwapLong(this, CTL, c, nc))
  70. ss = ns;
  71. else
  72. w.scanState = ss; // back out
  73. }
  74. checkSum = 0;
  75. }
  76. }
  77. }
  78. return null;
  79. }

awaitWork(WorkQueue w, int r)

进入该方法的WQ,本身已经是非激活状态,并且将自身的scanState更新到了ctl中。
如果当前worker不是最后一个等待worker,则进入休眠状态等待被其他线程唤醒。
如果当前worker是最后一个worker,则区分不同情况处理。

以下几种情况该方法返回false表达该worker需要终止

  1. /**
  2. * 返回false意味着该worker线程需要终止了。也就是需要执行deregisterWorker方法
  3. */
  4. private boolean awaitWork(WorkQueue w, int r) {
  5. if (w == null || w.qlock < 0) // w is terminating
  6. return false;
  7. for (int pred = w.stackPred, spins = SPINS, ss;;) {
  8. //能够进入该方法的worker都是处于非激活状态。如果发现再次被激活,则需要退出循环。
  9. if ((ss = w.scanState) >= 0)
  10. break;
  11. else if (spins > 0) {
  12. //类xorshift运算,得到下一个随机数
  13. r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
  14. if (r >= 0 && --spins == 0) { // randomize spins
  15. WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
  16. //检查worker堆栈中的上一个worker是否已经进入休眠状态。如果未曾进入休眠状态的话,意味着该worker也有可能不进入休眠。因此这里会通过设置spins的值继续进行自旋。
  17. if (pred != 0 && (ws = workQueues) != null &&
  18. (j = pred & SMASK) < ws.length &&
  19. (v = ws[j]) != null && // see if pred parking
  20. (v.parker == null || v.scanState >= 0))
  21. spins = SPINS; // continue spinning
  22. }
  23. }
  24. //worker线程被终止,则直接返回
  25. else if (w.qlock < 0) // recheck after spins
  26. return false;
  27. else if (!Thread.interrupted()) {
  28. long c, prevctl, parkTime, deadline;
  29. int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
  30. //计算结果ac的值含义是当前激活的worker数量
  31. if ((ac <= 0 && tryTerminate(false, false)) ||
  32. (runState & STOP) != 0) // pool terminating
  33. return false;
  34. //如果总的资源数已经超过了并发需求并且该worker是最后一个等待worker,则尝试终止该worker。如果通过ctl替换的方式无法终止,则通过超时的方式进行终止
  35. if (ac <= 0 && ss == (int)c) { // is last waiter
  36. //计算得到前一个ctl的值
  37. prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
  38. //得到tc部分的值
  39. int t = (short)(c >>> TC_SHIFT); // shrink excess spares
  40. //如果总资源数超过并发数+2,则意味着worker数量太多,终止本worker。
  41. //如果ctl更新成功,则直接返回false
  42. if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
  43. return false;
  44. //如果ctl更新失败则休眠一段时间后再次尝试
  45. parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
  46. deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
  47. }
  48. else
  49. prevctl = parkTime = deadline = 0L;
  50. Thread wt = Thread.currentThread();
  51. U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
  52. w.parker = wt;
  53. if (w.scanState < 0 && ctl == c) // recheck before park
  54. U.park(false, parkTime);
  55. U.putOrderedObject(w, QPARKER, null);
  56. U.putObject(wt, PARKBLOCKER, null);
  57. //被其他线程唤醒了
  58. if (w.scanState >= 0)
  59. break;
  60. //休眠时间到达,并且ctl更新成功,则返回false。需要停止该worker
  61. if (parkTime != 0L && ctl == c &&
  62. deadline - System.nanoTime() <= 0L &&
  63. U.compareAndSwapLong(this, CTL, c, prevctl))
  64. return false; // shrink pool
  65. }
  66. }
  67. return true;
  68. }

awaitJoin(WorkQueue w, ForkJoinTask task, long deadline)

等待给定的任务直到该任务完成。在等待的过程中,等待的worker并非直接进入阻塞状态。而是采取几个策略推进任务的整体执行

  1. 如果给定任务是本地任务,则尝试直接获取该任务,并且立刻执行
  2. 如果给定任务是其他worker的任务,则尝试偷取该worker的任务执行,以加快该worker完成给定任务的可能性。
  3. 没有任务可以执行则尝试创建一个代替worker或者恢复一个处于休眠状态的worker。如果创建或者恢复成功,则自身陷入阻塞。
  1. final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline)
  2. {
  3. int s = 0;
  4. if (task != null && w != null)
  5. {
  6. ForkJoinTask<?> prevJoin = w.currentJoin;
  7. U.putOrderedObject(w, QCURRENTJOIN, task);
  8. CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>) task : null;
  9. for (;;)
  10. {
  11. if ((s = task.status) < 0)
  12. break;
  13. if (cc != null)
  14. helpComplete(w, cc, 0);
  15. //如果没有本地任务,或者无法执行给定任务。则尝试将偷取了给定任务的worker中的本地任务偷取来自己执行。
  16. else if (w.base == w.top || w.tryRemoveAndExec(task))
  17. helpStealer(w, task);
  18. if ((s = task.status) < 0)
  19. break;
  20. long ms, ns;
  21. if (deadline == 0L)
  22. ms = 0L;
  23. //如果最后期限已过,则直接返回
  24. else if ((ns = deadline - System.nanoTime()) <= 0L)
  25. break;
  26. //等待时间最少是1ms。
  27. else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
  28. ms = 1L;
  29. //如果补偿成功,则本worker阻塞一段时间。
  30. if (tryCompensate(w))
  31. {
  32. task.internalWait(ms);
  33. U.getAndAddLong(this, CTL, AC_UNIT);
  34. }
  35. }
  36. U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
  37. }
  38. return s;
  39. }

helpStealer(WorkQueue w, ForkJoinTask task)

只有当worker w没有本地任务的时候才会执行这个方法。
worker v 尝试从偷取了task的work stealer的本地任务中偷取一个任务来执行。

  1. private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
  2. WorkQueue[] ws = workQueues;
  3. int oldSum = 0, checkSum, m;
  4. if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
  5. task != null) {
  6. do { // restart point
  7. checkSum = 0; // for stability check
  8. ForkJoinTask<?> subtask;
  9. WorkQueue j = w, v; // v is subtask stealer
  10. descent: for (subtask = task; subtask.status >= 0; ) {
  11. //使用随机数初始化一个奇数值。然后遍历WQ数组。只遍历其中的worker部分也就是奇数部分。
  12. for (int h = j.hint | 1, k = 0, i; ; k += 2) {
  13. //没有可以偷取任务的worker,退出整个大的循环
  14. if (k > m)
  15. break descent;
  16. if ((v = ws[i = (h + k) & m]) != null) {
  17. //发现了一个worker v,该worker偷取的任务就是给定的任务task。
  18. if (v.currentSteal == subtask) {
  19. j.hint = i;
  20. break;
  21. }
  22. checkSum += v.base;
  23. }
  24. }
  25. for (;;) {
  26. ForkJoinTask<?>[] a; int b;
  27. checkSum += (b = v.base);
  28. ForkJoinTask<?> next = v.currentJoin;
  29. //1. 如果给定的任务已经完成
  30. //2. 如果当前的worker v没有在等待给定任务
  31. //3. 如果偷取给定任务的worker又偷取了别的任务
  32. //意味着之前的数据已经过期,放弃
  33. if (subtask.status < 0 || j.currentJoin != subtask ||
  34. v.currentSteal != subtask) // stale
  35. break descent;
  36. //如果偷取任务的worker没有本地任务
  37. if (b - v.top >= 0 || (a = v.array) == null) {
  38. //偷取任务的worker没有在join的任务要完成
  39. if ((subtask = next) == null)
  40. break descent;
  41. j = v;
  42. break;
  43. }
  44. int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
  45. ForkJoinTask<?> t = ((ForkJoinTask<?>)
  46. U.getObjectVolatile(a, i));
  47. if (v.base == b) {
  48. if (t == null) // stale
  49. break descent;
  50. //从偷取给定任务的worker处偷取了一个任务来执行
  51. if (U.compareAndSwapObject(a, i, t, null)) {
  52. v.base = b + 1;
  53. ForkJoinTask<?> ps = w.currentSteal;
  54. int top = w.top;
  55. do {
  56. U.putOrderedObject(w, QCURRENTSTEAL, t);
  57. t.doExec();
  58. //如果本worker偷取的任务执行的过程中,分裂了。那么继续获取分裂后的本地任务执行,直到本地任务都被完成
  59. } while (task.status >= 0 &&
  60. w.top != top &&
  61. (t = w.pop()) != null);
  62. U.putOrderedObject(w, QCURRENTSTEAL, ps);
  63. //不相等意味着本地任务在分裂的时候,被其他的线程也偷取了一部分
  64. if (w.base != w.top)
  65. return; // can't further help
  66. }
  67. }
  68. }
  69. }
  70. //循环两次,整个Pool的状态都是稳定的,没有新增任务。
  71. } while (task.status >= 0 && oldSum != (oldSum = checkSum));
  72. }
  73. }

helpComplete(WorkQueue w, CountedCompleter task,int maxTasks)

  1. final int helpComplete(WorkQueue w, CountedCompleter<?> task,
  2. int maxTasks) {
  3. WorkQueue[] ws; int s = 0, m;
  4. if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
  5. task != null && w != null) {
  6. int mode = w.config; // for popCC
  7. int r = w.hint ^ w.top; // arbitrary seed for origin
  8. int origin = r & m; // first queue to scan
  9. int h = 1; // 1:ran, >1:contended, <0:hash
  10. for (int k = origin, oldSum = 0, checkSum = 0;;) {
  11. CountedCompleter<?> p; WorkQueue q;
  12. if ((s = task.status) < 0)
  13. break;
  14. if (h == 1 && (p = w.popCC(task, mode)) != null) {
  15. p.doExec(); // run local task
  16. if (maxTasks != 0 && --maxTasks == 0)
  17. break;
  18. origin = k; // reset
  19. oldSum = checkSum = 0;
  20. }
  21. else { // poll other queues
  22. if ((q = ws[k]) == null)
  23. h = 0;
  24. else if ((h = q.pollAndExecCC(task)) < 0)
  25. checkSum += h;
  26. if (h > 0) {
  27. if (h == 1 && maxTasks != 0 && --maxTasks == 0)
  28. break;
  29. r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
  30. origin = k = r & m; // move and restart
  31. oldSum = checkSum = 0;
  32. }
  33. else if ((k = (k + 1) & m) == origin) {
  34. if (oldSum == (oldSum = checkSum))
  35. break;
  36. checkSum = 0;
  37. }
  38. }
  39. }
  40. }
  41. return s;
  42. }

tryCompensate(WorkQueue w)

有意向阻塞worker v一段时间。因此这里尝试做出某种补偿策略。返回true意味着本worker可以阻塞

  1. private boolean tryCompensate(WorkQueue w)
  2. {
  3. boolean canBlock;
  4. WorkQueue[] ws;
  5. long c;
  6. int m, pc, sp;
  7. if (w == null || w.qlock < 0 || // caller terminating
  8. (ws = workQueues) == null || (m = ws.length - 1) <= 0 || (pc = config & SMASK) == 0)
  9. canBlock = false;
  10. //如果存在休眠的worker则尝试唤醒。如果尝试唤醒成功,则本worker可以阻塞。
  11. //外部代码中,当前worker如果从休眠中恢复,会增加AC的值。而在这里,因为唤醒成功的情况下本worker会阻塞。所以这里就不增加AC的值了。一增一减就归零了。
  12. else if ((sp = (int) (c = ctl)) != 0)
  13. canBlock = tryRelease(c, ws[sp & m], 0L);
  14. //没有休眠中的worker
  15. else
  16. {
  17. int ac = (int) (c >> AC_SHIFT) + pc;
  18. int tc = (short) (c >> TC_SHIFT) + pc;
  19. int nbusy = 0; // validate saturation
  20. //检查所有的奇数位置,也就是检查所有的worker的状态。
  21. //如果worker处于扫描任务的状态,标记为nbusy。遍历数组直到发现一个处于工作状态的线程
  22. //如果都没有过于工作状态的worker,那么遍历结束时nbusy应该是总资源数的2倍。(2倍是因为其选择奇数的算法造成)
  23. for (int i = 0; i <= m; ++i)
  24. {
  25. WorkQueue v;
  26. if ((v = ws[((i << 1) | 1) & m]) != null)
  27. {
  28. if ((v.scanState & SCANNING) != 0)
  29. break;
  30. ++nbusy;
  31. }
  32. }
  33. //有worker在处理任务或者ctl发生了变化。
  34. if (nbusy != (tc << 1) || ctl != c)
  35. canBlock = false; // unstable or stale
  36. //如果总资源大于配置的并发,激活worker数量大于1,本worker也没有任务
  37. //则尝试在不唤醒休眠线程的情况下,单纯让自身陷入等待状态。
  38. //所以这里让AC-1
  39. else if (tc >= pc && ac > 1 && w.isEmpty())
  40. {
  41. long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // uncompensated
  42. canBlock = U.compareAndSwapLong(this, CTL, c, nc);
  43. }
  44. //如果总worker数量超过了最大的限定worker数量,抛出异常
  45. else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares))
  46. throw new RejectedExecutionException("Thread limit exceeded replacing blocked worker");
  47. //尝试增加一个worker线程。这里只是增加了TC部分的值。
  48. //AC部分的值在外部方法中增加
  49. else
  50. { // similar to tryAddWorker
  51. boolean add = false;
  52. int rs; // CAS within lock
  53. long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT)));
  54. if (((rs = lockRunState()) & STOP) == 0)
  55. add = U.compareAndSwapLong(this, CTL, c, nc);
  56. unlockRunState(rs, rs & ~RSLOCK);
  57. canBlock = add && createWorker(); // throws on exception
  58. }
  59. }
  60. return canBlock;
  61. }

deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)

在Pool中取消该WQ的注册。主要内容是

worker会退出有两种情况。

  1. final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
  2. WorkQueue w = null;
  3. //将当前的workqueue从WQ数组中删除
  4. if (wt != null && (w = wt.workQueue) != null) {
  5. WorkQueue[] ws; // remove index from array
  6. int idx = w.config & SMASK;
  7. int rs = lockRunState();
  8. if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
  9. ws[idx] = null;
  10. unlockRunState(rs, rs & ~RSLOCK);
  11. }
  12. //更新ctl。删除一个资源数。主要就是在AC和TC部分-1
  13. long c;
  14. do {} while (!U.compareAndSwapLong
  15. (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
  16. (TC_MASK & (c - TC_UNIT)) |
  17. (SP_MASK & c))));
  18. if (w != null) {
  19. //设置qlock为-1,表明该worker已经终止。
  20. w.qlock = -1;
  21. w.transferStealCount(this);
  22. w.cancelAll(); // cancel remaining tasks
  23. }
  24. //本worker线程退出,因此确认下是否需要唤醒休眠线程,或者是在资源不足的情况下增加一个worker线程。
  25. for (;;) { // possibly replace
  26. WorkQueue[] ws; int m, sp;
  27. //如果当前pool已经处于终止过程中,说明不需要新增worker资源,退出。
  28. if (tryTerminate(false, false) || w == null || w.array == null ||
  29. (runState & STOP) != 0 || (ws = workQueues) == null ||
  30. (m = ws.length - 1) < 0)
  31. break;
  32. //如果存在处于休眠等待的worker线程,则直接唤醒
  33. if ((sp = (int)(c = ctl)) != 0) {
  34. if (tryRelease(c, ws[sp & m], AC_UNIT))
  35. break;
  36. }
  37. //如果是发生了异常导致了worker线程的终止。并且当前激活的worker资源少于预定的并发数,则尝试增加一个worker线程用于弥补本worker线程退出造成的影响。
  38. else if (ex != null && (c & ADD_WORKER) != 0L) {
  39. tryAddWorker(c);
  40. break;
  41. }
  42. else
  43. break;
  44. }
  45. if (ex == null) // help clean on way out
  46. ForkJoinTask.helpExpungeStaleExceptions();
  47. else // rethrow
  48. ForkJoinTask.rethrow(ex);
  49. }

tryTerminate(boolean now, boolean enable)

尝试执行Pool的终止流程。大致流程如下

  1. /**
  2. * enable为true,意味着开启Pool的SHUTDOWN流程。
  3. * now为true,意味着立刻进入STOP流程,否则就需要Pool当中不存在激活的worker以及未执行的任务时才能进入STOP阶段
  4. * 返回true,意味着整个池子已经终止或者正在终止中
  5. */
  6. private boolean tryTerminate(boolean now, boolean enable) {
  7. int rs;
  8. if (this == common) // cannot shut down
  9. return false;
  10. if ((rs = runState) >= 0) {
  11. if (!enable)
  12. return false;
  13. rs = lockRunState(); // enter SHUTDOWN phase
  14. unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
  15. }
  16. //尚未进入STOP阶段的话,则尝试进入
  17. if ((rs & STOP) == 0) {
  18. if (!now) { // check quiescence
  19. for (long oldSum = 0L;;) { // repeat until stable
  20. WorkQueue[] ws; WorkQueue w; int m, b; long c;
  21. long checkSum = ctl;
  22. //(int)(checkSum >> AC_SHIFT) + (config & SMASK)就是当前的激活的worker数量
  23. if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
  24. return false; // still active workers
  25. if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
  26. break; // check queues
  27. for (int i = 0; i <= m; ++i) {
  28. if ((w = ws[i]) != null) {
  29. //发现worker仍然有任务可以执行,则尝试唤醒worker线程w
  30. if ((b = w.base) != w.top || w.scanState >= 0 ||
  31. w.currentSteal != null) {
  32. tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
  33. return false; // arrange for recheck
  34. }
  35. checkSum += b;
  36. //首先关闭共享的workqueue
  37. if ((i & 1) == 0)
  38. w.qlock = -1; // try to disable external
  39. }
  40. }
  41. //两次循环checkSum未变化,意味着Pool已经稳定,退出循环
  42. if (oldSum == (oldSum = checkSum))
  43. break;
  44. }
  45. }
  46. //STOP阶段完成
  47. if ((runState & STOP) == 0) {
  48. rs = lockRunState();
  49. unlockRunState(rs, (rs & ~RSLOCK) | STOP);
  50. }
  51. }
  52. int pass = 0; // 3 passes to help terminate
  53. for (long oldSum = 0L;;) { // or until done or stable
  54. WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
  55. long checkSum = ctl;
  56. //如果总资源数小于等于0,或者workQueues已经为空,意味着Pool已经完成TERMINATED阶段的工作。将runState设定为TERMINATED状态
  57. if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
  58. (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
  59. if ((runState & TERMINATED) == 0) {
  60. rs = lockRunState(); // done
  61. //设定当前Pool为TERMINATED状态
  62. unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
  63. synchronized (this) { notifyAll(); } // for awaitTermination
  64. }
  65. break;
  66. }
  67. for (int i = 0; i <= m; ++i) {
  68. if ((w = ws[i]) != null) {
  69. checkSum += w.base;
  70. //设置qlock,这样其他的worker线程会发现,并且尝试自我终止
  71. w.qlock = -1;
  72. //非首轮扫描,则首先清除队列中的剩余任务。然后终止该worker线程
  73. if (pass > 0) {
  74. w.cancelAll(); // clear queue
  75. if (pass > 1 && (wt = w.owner) != null) {
  76. //中断该worker线程
  77. if (!wt.isInterrupted()) {
  78. try { // unblock join
  79. wt.interrupt();
  80. } catch (Throwable ignore) {
  81. }
  82. }
  83. //如果worker线程处于非激活状态,则唤醒该线程。
  84. //这里需要考虑下,因为如果worker线程准备进入休眠时上面的if发生了,那么worker线程仍然是会进入休眠的。所以这里需要再次执行唤醒。
  85. //如果worker线程在上面的if之前就已经是休眠状态,则中断操作会立刻唤醒该worker线程
  86. if (w.scanState < 0)
  87. U.unpark(wt); // wake up
  88. }
  89. }
  90. }
  91. }
  92. if (checkSum != oldSum) { // unstable
  93. oldSum = checkSum;
  94. pass = 0;
  95. }
  96. //终止流程走了太多次,放弃。
  97. else if (pass > 3 && pass > m) // can't further help
  98. break;
  99. //至少扫描3次,并且Pool进入了稳定状态,则尝试释放所有的休眠等待worker线程
  100. else if (++pass > 1) { // try to dequeue
  101. long c; int j = 0, sp; // bound attempts
  102. while (j++ <= m && (sp = (int)(c = ctl)) != 0)
  103. tryRelease(c, ws[sp & m], AC_UNIT);
  104. }
  105. }
  106. return true;
  107. }

tryRelease(long c, WorkQueue v, long inc)

如果worker线程v是处于栈顶的等待线程,则增加ctl中的AC的值。如果增加成功,则唤醒worker线程v。
返回true意味着成功唤醒了一个worker线程。

  1. /**
  2. * c 当前的ctl值
  3. * v 尝试唤醒的worker
  4. * inc 需要增加的ac的值。
  5. */
  6. private boolean tryRelease(long c, WorkQueue v, long inc) {
  7. int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
  8. //确定v确实是在等待线程栈的栈顶才能尝试唤醒
  9. if (v != null && v.scanState == sp) { // v is at top of stack
  10. //计算唤醒之后的ctl的值,nc(next ctl)
  11. long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
  12. if (U.compareAndSwapLong(this, CTL, c, nc)) {
  13. v.scanState = vs;
  14. if ((p = v.parker) != null)
  15. U.unpark(p);
  16. return true;
  17. }
  18. }
  19. return false;
  20. }

cancelAll()

取消当前正在等待的join任务,取消当前偷取的任务,将任务队列中的任务逐一获取并且取消

  1. final void cancelAll() {
  2. ForkJoinTask<?> t;
  3. if ((t = currentJoin) != null) {
  4. currentJoin = null;
  5. ForkJoinTask.cancelIgnoringExceptions(t);
  6. }
  7. if ((t = currentSteal) != null) {
  8. currentSteal = null;
  9. ForkJoinTask.cancelIgnoringExceptions(t);
  10. }
  11. while ((t = poll()) != null)
  12. ForkJoinTask.cancelIgnoringExceptions(t);
  13. }

WorkQueue内容

WQ可以执行的操作有push,pop,和pull(就是其他线程的steal)。push和pop都是只能由本线程来完成,而poll则有可能是其他线程执行的。

growArray

workqueue内的task数组扩容

  1. final ForkJoinTask<?>[] growArray()
  2. {
  3. ForkJoinTask<?>[] oldA = array;
  4. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
  5. if (size > MAXIMUM_QUEUE_CAPACITY)
  6. throw new RejectedExecutionException("Queue capacity exceeded");
  7. int oldMask, t, b;
  8. ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
  9. //如果不是初始化并且数组内有数据,就要尝试迁移
  10. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0)
  11. {
  12. int mask = size - 1;
  13. do
  14. {
  15. //迁移不是简单通过拷贝完成。而是需要cas竞争,将之前数组的内容移动到新的数组中
  16. ForkJoinTask<?> x;
  17. int oldj = ((b & oldMask) << ASHIFT) + ABASE;
  18. int j = ((b & mask) << ASHIFT) + ABASE;
  19. x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
  20. if (x != null && U.compareAndSwapObject(oldA, oldj, x, null))
  21. U.putObjectVolatile(a, j, x);
  22. } while (++b != t);
  23. }
  24. return a;
  25. }

push(ForkJoinTask task)

  1. final void push(ForkJoinTask<?> task)
  2. {
  3. ForkJoinTask<?>[] a;
  4. ForkJoinPool p;
  5. int b = base, s = top, n;
  6. if ((a = array) != null)
  7. { // ignore if queue removed
  8. int m = a.length - 1; // fenced write for task visibility
  9. U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
  10. U.putOrderedInt(this, QTOP, s + 1);
  11. //如果n<=1,意味着该worker很可能拿走处理完自身的任务之后会大概率进入休眠。因此这里已经推送了一个任务进来可以尝试下唤醒等待堆栈栈顶的等待worker。
  12. if ((n = s - b) <= 1)
  13. {
  14. if ((p = pool) != null)
  15. p.signalWork(p.workQueues, this);
  16. }
  17. else if (n >= m)
  18. growArray();
  19. }
  20. }

runTask(ForkJoinTask task)

将给定的任务设置为currentSteal的值,并且执行。
执行完毕后执行本地任务。

  1. final void runTask(ForkJoinTask<?> task) {
  2. if (task != null) {
  3. //设置扫描状态为未扫描。也就是将最低位bit设置为0.因为初始状态下scanState必然是奇数,因此设置为偶数时就意味着处于任务执行中。
  4. scanState &= ~SCANNING;
  5. //执行偷取来的任务
  6. (currentSteal = task).doExec();
  7. //执行完毕设置为null用于帮助gc。这里没有使用直接赋值主要是提升写效率。
  8. U.putOrderedObject(this, QCURRENTSTEAL, null);
  9. //执行本地任务
  10. execLocalTasks();
  11. ForkJoinWorkerThread thread = owner;
  12. //如果当前偷取的任务数已经超过了int的上限,则将该值加入到Pool的偷取总数中。
  13. if (++nsteals < 0) // collect on overflow
  14. transferStealCount(pool);
  15. //任务执行完毕,恢复scanState到扫描任务的数值
  16. scanState |= SCANNING;
  17. if (thread != null)
  18. thread.afterTopLevelExec();
  19. }
  20. }

execLocalTasks()

  1. final void execLocalTasks()
  2. {
  3. int b = base, m, s;
  4. ForkJoinTask<?>[] a = array;
  5. // s=top-1是实际存在task的槽位下标。这三个条件说明本地任务数组中存在任务
  6. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0)
  7. {
  8. // 如果不是先进先出,那就是后进先出也就是栈模式。则从s位置不断取出任务直到b位置。
  9. if ((config & FIFO_QUEUE) == 0)
  10. {
  11. for (ForkJoinTask<?> t;;)
  12. {
  13. // 先尝试获取任务,获取任务成功。才能真正的减少top。因为其他的线程可能是从base处往top处偷取任务。并且是先偷取任务,后移动base。所以可能造成base值未增加,但是base处的任务已经被拿走了。
  14. if ((t = (ForkJoinTask<?>) U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
  15. break;
  16. // 取出任务并且设置top减少1.
  17. U.putOrderedInt(this, QTOP, s);
  18. t.doExec();
  19. // 表达式成立,意味着base==top。说明本地任务已经清空
  20. if (base - (s = top - 1) > 0)
  21. break;
  22. }
  23. }
  24. // 如果先进先出,则执行下面的逻辑
  25. else
  26. pollAndExecAll();
  27. }
  28. }

pollAndExecAll()

不断的尝试从base槽位取得任务直到任务数组为空

  1. final void pollAndExecAll()
  2. {
  3. for (ForkJoinTask<?> t; (t = poll()) != null;)
  4. t.doExec();
  5. }

poll()

从WQ中的数组的base槽位通过cas竞争方式取得一个任务。重复该流程直到取得任务或者任务数组为空

  1. final ForkJoinTask<?> poll()
  2. {
  3. ForkJoinTask<?>[] a;
  4. int b;
  5. ForkJoinTask<?> t;
  6. //本地WQ的任务数组存在可以执行的任务
  7. //每一次循环都需要重新获取base的值。因为其他线程可能更新了该值
  8. while ((b = base) - top < 0 && (a = array) != null)
  9. {
  10. int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
  11. t = (ForkJoinTask<?>) U.getObjectVolatile(a, j);
  12. //获取数据的过程中base没有发生变化。因为其他线程也会从base处偷取任务,所以这里先判断一次
  13. if (base == b)
  14. {
  15. if (t != null)
  16. {
  17. //通过cas方式将b下标处设置为null。如果设置成功,意味着获取数据成功,设置成功后才能递增base
  18. if (U.compareAndSwapObject(a, j, t, null))
  19. {
  20. base = b + 1;
  21. return t;
  22. }
  23. }
  24. //t为null意味着b槽位被其他线程获取,因此base递增的动作留给其他线程完成。本处检测当前槽位b是否已经是数组中最后一个有任务的槽位。
  25. else if (b + 1 == top) // now empty
  26. break;
  27. }
  28. }
  29. return null;
  30. }

transferStealCount(ForkJoinPool p)

将本worker偷取的任务数总量增加到Pool的统计数字中

  1. final void transferStealCount(ForkJoinPool p) {
  2. AtomicLong sc;
  3. if (p != null && (sc = p.stealCounter) != null) {
  4. int s = nsteals;
  5. nsteals = 0; // if negative, correct for overflow
  6. sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
  7. }
  8. }

cancelAll()

取消当前的所有任务

  1. final void cancelAll() {
  2. ForkJoinTask<?> t;
  3. if ((t = currentJoin) != null) {
  4. currentJoin = null;
  5. ForkJoinTask.cancelIgnoringExceptions(t);
  6. }
  7. if ((t = currentSteal) != null) {
  8. currentSteal = null;
  9. ForkJoinTask.cancelIgnoringExceptions(t);
  10. }
  11. while ((t = poll()) != null)
  12. ForkJoinTask.cancelIgnoringExceptions(t);
  13. }

tryUnpush(ForkJoinTask t)

如果给定的任务处于当前wq的top位置,则cas方式尝试取出。如果取出成功返回true。取出失败或者给定任务不是在top位置,返回false

  1. final boolean tryUnpush(ForkJoinTask<?> t)
  2. {
  3. ForkJoinTask<?>[] a;
  4. int s;
  5. if ((a = array) != null && (s = top) != base && U.compareAndSwapObject(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null))
  6. {
  7. U.putOrderedInt(this, QTOP, s);
  8. return true;
  9. }
  10. return false;
  11. }

tryRemoveAndExec(ForkJoinTask task)

遍历本地任务,如果给定任务是本地任务,则尝试从本地任务堆栈中取出该任务(如果是栈顶任务直接取出,如果是中间任务则尝试用空任务替换),并且执行该任务。
如果没有本地任务并且无法确定给定任务的状态,返回true

  1. final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
  2. ForkJoinTask<?>[] a; int m, s, b, n;
  3. if ((a = array) != null && (m = a.length - 1) >= 0 &&
  4. task != null) {
  5. while ((n = (s = top) - (b = base)) > 0) {
  6. //遍历top到base位置的本地任务
  7. for (ForkJoinTask<?> t;;) { // traverse from s to b
  8. long j = ((--s & m) << ASHIFT) + ABASE;
  9. //如果获取的任务为空,意味着遍历已经结束
  10. if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
  11. //s+1==top意味着没有本地任务了。
  12. return s + 1 == top; // shorter than expected
  13. //给定任务是本地任务中的一个
  14. else if (t == task) {
  15. boolean removed = false;
  16. //如果给定任务是栈顶任务,则尝试获取
  17. if (s + 1 == top) { // pop
  18. if (U.compareAndSwapObject(a, j, task, null)) {
  19. U.putOrderedInt(this, QTOP, s);
  20. removed = true;
  21. }
  22. }
  23. //如果不是栈顶任务,那就是中间的任务。本地任务的执行要么是从base端开始,要么是从top端开始,不能从中间开始。因此这里尝试使用一个空任务替换掉这个任务本身。然后才能尝试执行该任务。
  24. //如果base尚未变化才能执行下面的流程。在base变化的情况下,就需要重新开始执行整个流程
  25. else if (base == b) // replace with proxy
  26. removed = U.compareAndSwapObject(
  27. a, j, task, new EmptyTask());
  28. if (removed)
  29. task.doExec();
  30. break;
  31. }
  32. //如果栈顶任务已经不是等待处理状态,尝试抛弃。
  33. else if (t.status < 0 && s + 1 == top) {
  34. if (U.compareAndSwapObject(a, j, t, null))
  35. U.putOrderedInt(this, QTOP, s);
  36. break; // was cancelled
  37. }
  38. //遍历完成
  39. //遍历完成意味着
  40. //1.本地任务不为空
  41. //2.给定任务不是本地任务
  42. if (--n == 0)
  43. return false;
  44. }
  45. if (task.status < 0)
  46. return false;
  47. }
  48. }
  49. return true;
  50. }

ForkjoiniWorkThreead

生成一个ForkJoinWorkThread。并且创建一个私有的WQ。该WQ是在Pool的WQ数组中。

  1. protected ForkJoinWorkerThread(ForkJoinPool pool) {
  2. // Use a placeholder until a useful name can be set in registerWorker
  3. super("aForkJoinWorkerThread");
  4. this.pool = pool;
  5. this.workQueue = pool.registerWorker(this);
  6. }

run

启动worker线程,最重要的操作就是调用了pool.runWorker(workQueue);

  1. public void run() {
  2. if (workQueue.array == null) { // only run once
  3. Throwable exception = null;
  4. try {
  5. onStart();
  6. pool.runWorker(workQueue);
  7. } catch (Throwable ex) {
  8. exception = ex;
  9. } finally {
  10. try {
  11. onTermination(exception);
  12. } catch (Throwable ex) {
  13. if (exception == null)
  14. exception = ex;
  15. } finally {
  16. pool.deregisterWorker(this, exception);
  17. }
  18. }
  19. }
  20. }

ForkJoinTask

cancelIgnoringExceptions(ForkJoinTask t)

尝试取消任务。所谓的取消任务,就是设置任务的状态属性为取消。

  1. static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
  2. if (t != null && t.status >= 0) {
  3. try {
  4. t.cancel(false);
  5. } catch (Throwable ignore) {
  6. }
  7. }
  8. }

cancel(boolean mayInterruptIfRunning)

设置任务的状态属性为取消

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
  3. }

doExec()

  1. final int doExec() {
  2. int s; boolean completed;
  3. //status大于0意味着任务尚未取消,可以执行
  4. if ((s = status) >= 0) {
  5. try {
  6. completed = exec();
  7. } catch (Throwable rex) {
  8. return setExceptionalCompletion(rex);
  9. }
  10. if (completed)
  11. s = setCompletion(NORMAL);
  12. }
  13. return s;
  14. }

fork()

将这个任务推送到当前worker的wq中。如果当前线程不是forkjoinworkerthread,则推送到统一的公共Pool中。

  1. public final ForkJoinTask<V> fork() {
  2. Thread t;
  3. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  4. ((ForkJoinWorkerThread)t).workQueue.push(this);
  5. else
  6. ForkJoinPool.common.externalPush(this);
  7. return this;
  8. }

join()

  1. public final V join() {
  2. int s;
  3. if ((s = doJoin() & DONE_MASK) != NORMAL)
  4. reportException(s);
  5. return getRawResult();
  6. }

doJoin()

  1. //这段代码对三元表达式用的,不管怎么格式化,可读性都不是很高,只能勉强看下
  2. private int doJoin()
  3. {
  4. int s;
  5. Thread t;
  6. ForkJoinWorkerThread wt;
  7. ForkJoinPool.WorkQueue w;
  8. return (s = status) < 0 ?
  9. s :
  10. //当前线程是forkjoinworkerthread
  11. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  12. //判断当前任务是否处于WQ的top位置。如果是的话并且成功取得该任务,则直接执行
  13. (w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ?
  14. //否则就等待该任务执行完毕
  15. s : wt.pool.awaitJoin(w, this, 0L)
  16. :
  17. //如果当前线程不是forkjointhreadpool
  18. externalAwaitDone();
  19. }

internalWait(long timeout)

该方法是worker线程来调用。
设定任务的signal标志位为true。并且等待直到超时或者被其他线程唤醒。

  1. final void internalWait(long timeout) {
  2. int s;
  3. if ((s = status) >= 0 && // force completer to issue notify
  4. U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
  5. synchronized (this) {
  6. if (status >= 0)
  7. try { wait(timeout); } catch (InterruptedException ie) { }
  8. else
  9. notifyAll();
  10. }
  11. }
  12. }

setCompletion(int completion)

设置任务状态为完成。如果有线程在等待该任务的完成,则唤醒所有等待的线程。

  1. private int setCompletion(int completion) {
  2. for (int s;;) {
  3. if ((s = status) < 0)
  4. return s;
  5. if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
  6. if ((s >>> 16) != 0)
  7. synchronized (this) { notifyAll(); }
  8. return completion;
  9. }
  10. }
  11. }

主要字段解读

ForkJoinPool

ctl字段的解读

ctl是一个long整型。以16bit为单位被分成4个区间。

如果ctl的低32位为0,意味着当前没有处于等待状态的worker线程。也就是没有空闲资源。

workQueues

该数组长度为2的次方幂。该数组的槽位可能存在null值。数组具有如下特性
+ 奇数下标的槽位用于存储属于worker的私有workqueue
+ 偶数下标的槽位用于存储共享的workqueue。共享workqueue的槽位下标最大不超过64.

WorkQueue

scanState

该字段用于表达worker线程的状态。是一个int类型的字段。包含如下信息

qlock

用于完成对worker的锁定工作。值具备如下含义

ForkJoinTask

state

用于表达task当前的任务状态

  1. volatile int status; // accessed directly by pool and workers
  2. static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
  3. static final int NORMAL = 0xf0000000; // must be negative
  4. static final int CANCELLED = 0xc0000000; // must be < NORMAL
  5. static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
  6. static final int SIGNAL = 0x00010000; // must be >= 1 << 16
  7. static final int SMASK = 0x0000ffff; // short bits for tags

问题标记

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注