[关闭]
@lambeta 2016-09-20T12:10:48.000000Z 字数 19164 阅读 311

第6章

translation


第6章

同步器

Java提供了synchronized关键字对临界区进行线程同步访问。由于基于synchronized很难正确地编写同步代码,所以并发工具类提供了高级的同步器(控制通用同步方法的类)。本章中,我会介绍倒计时门闩(countdown latch)、同步屏障(cyclic barrier)、交换器(exchanger)、信号量(semaphore)以及phaser同步器。

倒计时门闩

倒计时门闩会导致一条或多条线程在“门口”一直等待直到另一条线程打开这扇门,线程才得以继续运行。它是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程等待直到计数变为0”以及“递减计数变量”。

类java.util.concurrent.CountDownLatch实现了倒计时门闩(countdown latch)同步器。你可以通过调用这个类的构造器CountDownLatch(int count)指定计数个数来初始化一个CountDownLatch的实例。当count的值是负数,该方法会抛出java.lang.IllegalArgumentException。

类CountDownLatch也提供了下列方法:

你会经常使用一个倒计时门闩来保证多条线程几乎同时开始工作。举个例子,请看清单6-1:

  1. import java.util.concurrent.CountDownLatch;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class CountDownLatchDemo
  5. {
  6. final static int NTHREADS = 3;
  7. public static void main(String[] args)
  8. {
  9. final CountDownLatch startSignal = new CountDownLatch(1);
  10. final CountDownLatch doneSignal = new CountDownLatch(NTHREADS);
  11. Runnable r = new Runnable()
  12. {
  13. @Override
  14. public void run()
  15. {
  16. try {
  17. report("entered run()");
  18. startSignal.await(); // wait until told to ...
  19. report("doing work"); // ... proceed
  20. Thread.sleep((int) (Math.random() * 1000));
  21. doneSignal.countDown(); // reduce count on which
  22. // main thread is ...
  23. }
  24. catch (InterruptedException ie)
  25. {
  26. System.err.println(ie);
  27. }
  28. }
  29. void report(String s)
  30. {
  31. System.out.println(System.currentTimeMillis() +
  32. ": " + Thread.currentThread() +
  33. ": " + s);
  34. }
  35. };
  36. ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);
  37. for (int i = 0; i < NTHREADS; i++)
  38. executor.execute(r);
  39. try
  40. {
  41. System.out.println("main thread doing something");
  42. Thread.sleep(1000); // sleep for 1 second
  43. startSignal.countDown(); // let all threads proceed
  44. System.out.println("main thread doing something else");
  45. doneSignal.await(); // wait for all threads to finish
  46. executor.shutdownNow();
  47. }
  48. catch (InterruptedException ie)
  49. {
  50. System.err.println(ie);
  51. }
  52. }
  53. }

清单6-1中的默认主线程首先创建了一对倒计时门闩。这个startSignal门闩会在默认主线程就绪之前禁止任何工作线程执行,而doneSignal门闩会使得默认主线程等待所有的工作线程全部结束执行。

默认主线程接下来创建了一个带有run()方法的runnable,它会被后续创建的工作线程执行。

run()方法首先输出一条消息,然后调用startSignal的await()方法在继续执行之前会一直等待直到门闩的计数变为0。继续执行时,run()方法输出一条表示工作正在进行的消息并且挑一个随机的时间(从0到999毫秒)睡眠用于模拟正在做的工作。

到这里,run()调用了doneSignal的countDown()方法递减了门闩的计数。一旦计数降到0,等着这个信号的默认主线程就会继续执行,关闭executor和中断应用程序。

创建好runnable之后,从包含NTHREADS条线程的线程池获取一个executor,然后在这个executor上连续调用NTHREADS次execute()方法,依次把runnable传入线程池中。这一动作会启动线程运行run()方法。

接下来,默认主线程输出一条消息并且通过睡眠1秒钟的方式模拟执行其它的工作(得以让所有工作线程依次执行run()方法,进而调用startSignal.await()),然后调用startSingal的countDown()方法使工作线程开始运行,再输出一条消息表示主线程正在做一些别的事情,接着调用doneSignal的await()方法在它可以继续执行之前等待倒计时门闩的计数降至0。

照着下面编译清单6-1:

  1. javac CountDownLatchDemo.java

运行程序:

  1. java CountDownLatchDemo

你应该能观测到类似下列的输出(消息顺序可能略有出入):

  1. main thread doing something
  2. 1445802274931: Thread[pool-1-thread-2,5,main]: entered run()
  3. 1445802274931: Thread[pool-1-thread-3,5,main]: entered run()
  4. 1445802274931: Thread[pool-1-thread-1,5,main]: entered run()
  5. main thread doing something else
  6. 1445802275931: Thread[pool-1-thread-2,5,main]: doing work
  7. 1445802275931: Thread[pool-1-thread-3,5,main]: doing work
  8. 1445802275933: Thread[pool-1-thread-1,5,main]: doing work

同步屏障

同步屏障允许一组线程彼此互相等待,直到抵达到某个公共的屏障点。因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。该同步器对于这类数量固定,并且互相之间必须不时等待彼此的多线程应用很有用。

类java.util.concurrent.CyclicBarrier实现了同步屏障。通过调用这个类的CyclicBarrier(int parties)构造函数,你可以初始化一个包含指定parties(拥有共同执行目标的多条线程)数目的CyclicBarrier实例。如果parties的值小于1,构造函数就会抛出IllegalArgumentException。

另外,你可以调用CyclicBarrier(int parties, Runnable barrierAction)构造函数来初始化一个同步屏障,包含指定parties数目的线程以及一旦跨越屏障就会执行的barrierAction。换句话说,当parties - 1条线程处于等待当中并且又有一条线程到达,这条到达的线程会执行barrierAction,然后所有的线程继续执行。这个runnable适用于在任意线程继续执行之前更新共享状态。当传递给parties的值小于1,该构造函数抛出IllegalArgumentException。(前面的构造函数会调用这个构造函数,若把barrierAction设为null,那么当跨越屏障时,就没有runnable可供执行)。

CyclicBarrier也提供了下列方法:

同步屏障在并行分解的场合下很有用。在这里长时间的任务被分割成多个子任务,它们单独的结果之后会被合并到整个任务的结果当中。同步屏障的Javadoc给出了清单6-2的示例代码。

清单6-2 使用同步屏障把一个任务分解成多个子任务

  1. import java.util.concurrent.BrokenBarrierException;
  2. import java.util.concurrent.CyclicBarrier;
  3. public class CyclicBarrierDemo
  4. {
  5. public static void main(String[] args)
  6. {
  7. float[][] matrix = new float[3][3];
  8. int counter = 0;
  9. for (int row = 0; row < matrix.length; row++)
  10. for (int col = 0; col < matrix[0].length; col++)
  11. matrix[row][col] = counter++;
  12. dump(matrix);
  13. System.out.println();
  14. Solver solver = new Solver(matrix);
  15. System.out.println();
  16. dump(matrix);
  17. }
  18. static void dump(float[][] matrix)
  19. {
  20. for (int row = 0; row < matrix.length; row++)
  21. {
  22. for (int col = 0; col < matrix[0].length; col++)
  23. System.out.print(matrix[row][col] + " ");
  24. System.out.println();
  25. }
  26. }
  27. }
  28. class Solver
  29. {
  30. final int N;
  31. final float[][] data;
  32. final CyclicBarrier barrier;
  33. class Worker implements Runnable
  34. {
  35. int myRow;
  36. boolean done = false;
  37. Worker(int row)
  38. {
  39. myRow = row;
  40. }
  41. boolean done()
  42. {
  43. return done;
  44. }
  45. void processRow(int myRow)
  46. {
  47. System.out.println("Processing row: " + myRow);
  48. for (int i = 0; i < N; i++)
  49. data[myRow][i] *= 10;
  50. done = true;
  51. }
  52. @Override
  53. public void run()
  54. {
  55. while (!done())
  56. {
  57. processRow(myRow);
  58. try {
  59. barrier.await();
  60. }
  61. catch (InterruptedException ie)
  62. {
  63. return;
  64. }
  65. catch (BrokenBarrierException bbe)
  66. {
  67. return;
  68. }
  69. }
  70. }
  71. }
  72. public Solver(float[][] matrix)
  73. {
  74. data = matrix;
  75. N = matrix.length;
  76. barrier = new CyclicBarrier(N,
  77. new Runnable()
  78. {
  79. @Override
  80. public void run()
  81. {
  82. mergeRows();
  83. }
  84. });
  85. for (int i = 0; i < N; ++i)
  86. new Thread(new Worker(i)).start();
  87. waitUntilDone();
  88. }
  89. void mergeRows()
  90. {
  91. System.out.println("merging");
  92. synchronized("abc")
  93. {
  94. "abc".notify();
  95. }
  96. }
  97. void waitUntilDone()
  98. {
  99. synchronized("abc")
  100. {
  101. try {
  102. System.out.println("main thread waiting");
  103. "abc".wait();
  104. System.out.println("main thread notified");
  105. }
  106. catch (InterruptedException ie)
  107. {
  108. System.out.println("main thread interrupted");
  109. }
  110. }
  111. }
  112. }

清单6-2中的默认主线程首先创建了一个浮点数方阵并把这个方阵导出到标准输出流当中。主线程然后初始化了类Solver,这个类为每一行分别创建一条线程进行计算。更改之后的矩阵随后也被导出。

Solver提供了一个构造函数,它接收矩阵参数并用属性data指向这个矩阵同时用属性N指向矩阵行的数目。这个构造函数之后创建了一个拥有N条线程的同步屏障,并且又负责把所有的行合并到矩阵。最后,该构造函数分别创建工作线程,负责处理矩阵中单一的行。之后构造函数等待所有工作线程结束。

工作线程的run()方法反复在指定的行上调用processRow()方法直到done()方法返回true。(在这个例子当中)在processRow()执行过一次之后done()就会返回true。在processRow()返回之后,也就意味着矩阵行已经被处理了,当前的工作线程就会于同步屏障之上掉所有await()方法,它也就无法执行下去了。

某些时候,所有的工作线程都已经调用await()方法了。当处理矩阵最后一行的这条最后的线程调用了await()方法,它就会触发屏障动作来将所有处理过的矩阵行合并到最终的矩阵当中。在这个例子当中,合并操作不是必须的,但在更复杂的例子里或许是必要的。

最后mergeRow()方法进行的任务是去通知调用Solver构造函数的主线程。主线程等待在String对象"abc"所关联的监听器上,而调用notify()足以唤醒这唯一一条在此监听器上等待的线程。

照下面编译清单 6-2:

  1. javac CyclicBarrierDemo.java

运行程序:

  1. java CyclicBarrierDemo

你应该能观测到类似下列的输出(消息的顺序可能略有出入):

  1. 0.0 1.0 2.0
  2. 3.0 4.0 5.0
  3. 6.0 7.0 8.0
  4. main thread waiting
  5. Processing row: 0
  6. Processing row: 1
  7. Processing row: 2
  8. merging
  9. main thread notified
  10. 0.0 10.0 20.0
  11. 30.0 40.0 50.0
  12. 60.0 70.0 80.0

交换器

交换器提供了一个线程彼此之间能够交换对象的同步点。每条线程都会往这个交换器的exchange()方法传入一些对象,匹配伙伴线程,同时接收伙伴线程中的对象作为返回值。交换器在诸如遗传算法( http://en.wikipedia.org/wiki/Genetic_algorithm )和管道设计的应用程序中会很有用。

泛型类java.util.concurrent.Exchanger< V >实现了交换器。你可以通过调用Exchanger()构造函数来初始化一个交换器,然后调用下列方法之一进行一次交换:

清单6-3扩展了这个Exchanger的Javadoc中重复地填充和清空Exchanger缓冲区的例子。

清单6-3 使用一个交换器来交换缓冲区

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.Exchanger;
  4. public class ExchangerDemo
  5. {
  6. final static Exchanger<DataBuffer> exchanger =
  7. new Exchanger<DataBuffer>();
  8. final static DataBuffer initialEmptyBuffer = new DataBuffer();
  9. final static DataBuffer initialFullBuffer = new DataBuffer("I");
  10. public static void main(String[] args)
  11. {
  12. class FillingLoop implements Runnable
  13. {
  14. int count = 0;
  15. @Override
  16. public void run()
  17. {
  18. DataBuffer currentBuffer = initialEmptyBuffer;
  19. try
  20. {
  21. while (true)
  22. {
  23. addToBuffer(currentBuffer);
  24. if (currentBuffer.isFull())
  25. {
  26. System.out.println("filling thread wants to exchange");
  27. currentBuffer = exchanger.exchange(currentBuffer);
  28. System.out.println("filling thread receives exchange");
  29. }
  30. }
  31. }
  32. catch (InterruptedException ie)
  33. {
  34. System.out.println("filling thread interrupted");
  35. }
  36. }
  37. void addToBuffer(DataBuffer buffer)
  38. {
  39. String item = "NI" + count++;
  40. System.out.println("Adding: " + item);
  41. buffer.add(item);
  42. }
  43. }
  44. class EmptyingLoop implements Runnable
  45. {
  46. @Override
  47. public void run()
  48. {
  49. DataBuffer currentBuffer = initialFullBuffer;
  50. try
  51. {
  52. while (true)
  53. {
  54. takeFromBuffer(currentBuffer);
  55. if (currentBuffer.isEmpty())
  56. {
  57. System.out.println("emptying thread wants to " +
  58. "exchange");
  59. currentBuffer = exchanger.exchange(currentBuffer);
  60. System.out.println("emptying thread receives " + "exchange");
  61. }
  62. }
  63. }
  64. catch (InterruptedException ie)
  65. {
  66. System.out.println("emptying thread interrupted");
  67. }
  68. }
  69. void takeFromBuffer(DataBuffer buffer)
  70. {
  71. System.out.println("taking: " + buffer.remove());
  72. }
  73. }
  74. new Thread(new EmptyingLoop()).start();
  75. new Thread(new FillingLoop()).start();
  76. }
  77. }
  78. class DataBuffer
  79. {
  80. private final static int MAXITEMS = 10;
  81. private final List<String> items = new ArrayList<>();
  82. DataBuffer()
  83. {
  84. }
  85. DataBuffer(String prefix)
  86. {
  87. for (int i = 0; i < MAXITEMS; i++)
  88. {
  89. String item = prefix + i;
  90. System.out.printf("Adding %s%n", item);
  91. items.add(item);
  92. }
  93. }
  94. synchronized void add(String s)
  95. {
  96. if (!isFull())
  97. items.add(s);
  98. }
  99. synchronized boolean isEmpty()
  100. {
  101. return items.size() == 0;
  102. }
  103. synchronized boolean isFull()
  104. {
  105. return items.size() == MAXITEMS;
  106. }
  107. synchronized String remove()
  108. {
  109. if (!isEmpty())
  110. return items.remove(0);
  111. return null;
  112. }
  113. }

清单6-3的默认主线程通过静态属性初始化创建了一个交换器以及一对缓冲区。之后,它初始化本地的类EmptyingLoop和FillingLoop并且将这些runnable传递到新的线程实例当中,这些实例随后会被启动。(也可以使用executors)每个runnables的run()方法进入一个无限循环,反复地往它的缓冲中添加或者删除。当缓冲区满了或者空了,这个交换器会用来交换这些缓冲,持续地添加或清空。

照下面编译清单6-3:

  1. javac ExchangerDemo.java

运行程序:

  1. java ExchangerDemo

你应该能观测到类似下列输出的开始部分(消息的顺序可能略有出入)

  1. Adding I0
  2. Adding I1
  3. Adding I2
  4. Adding I3
  5. Adding I4
  6. Adding I5
  7. Adding I6
  8. Adding I7
  9. Adding I8
  10. Adding I9
  11. taking: I0
  12. taking: I1
  13. taking: I2
  14. taking: I3
  15. taking: I4
  16. taking: I5
  17. taking: I6
  18. taking: I7
  19. taking: I8
  20. taking: I9
  21. emptying thread wants to exchange
  22. Adding: NI0
  23. Adding: NI1
  24. Adding: NI2
  25. Adding: NI3
  26. Adding: NI4
  27. Adding: NI5
  28. Adding: NI6
  29. Adding: NI7
  30. Adding: NI8
  31. Adding: NI9
  32. filling thread wants to exchange
  33. filling thread receives exchange
  34. emptying thread receives exchange
  35. Adding: NI10
  36. taking: NI0
  37. Adding: NI11
  38. taking: NI1
  39. Adding: NI12

信号量

信号量维护了一组许可证(permit)来约束访问被限制资源的线程数。当没有可用的许可证时,线程的获取尝试会一直阻塞直到其它的线程释放一个许可证。


注意
当前的值可以被递增加1的信号量被称作计数信号量。而当前的值只能取0或1的信号量则被称为二进制信号量或者互斥信号量。在这两种场景中,当前的值都不能为负。


类java.util.concurrent.Semaphore实现了这一同步器,同时将信号量概念化成一个维护一组许可证的对象。你可以调用Semaphore(int permits)构造函数初始化一个信号量,其中permits指定了可用许可证的数量。这个信号量的公平策略被设置成false(不公平)。或者,你也可以调用Semaphore(int permits, boolean fair)构造函数把信号量的公平策略设置成true(公平)。

信号量和公平策略

当公平策略设置为false,信号量不会保证线程获取许可证的顺序。特别地,barging是允许的。也就是说,即便线程已经在等待,调用了acquire()方法的新线程还是能先于这条线程被分配许可证。逻辑上,新线程把自己放到了等待线程队列的队首了。当公平策略被设置为true,信号量就能保证调用acquire()方法的任意线程能以方法被调用处理的顺序获取许可证(先进先出,FIFO)。因为FIFO顺序需要应用到这些方法中指定的内部执行点上,很可能一条线程先于另一条线程调用acquire()方法但是却后于那条线程抵达顺序点,从方法中返回也类似。当然,不限时的tryAcquire()方法不会遵循公平策略的设定——它们会获取任意可用的许可证。

一般来讲,信号量通常被用来控制资源访问,它应当被初始化成公平的,从而保证不会有任何线程在访问资源时饿死。当针对其它同步控制使用信号量的时候,不公平策略带来的吞吐量好处是超过公平策略的。

信号量也提供下列方法:

清单6-4基于Semaphore Javadoc展现的“控制对于一组条目的访问”例子进行了扩展。

清单6-4 使用计数信号量去控制对一组条目的访问

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Semaphore;
  4. public class SemaphoreDemo
  5. {
  6. public static void main(String[] args)
  7. {
  8. final Pool pool = new Pool();
  9. Runnable r = new Runnable()
  10. {
  11. @Override
  12. public void run()
  13. {
  14. String name = Thread.currentThread().getName();
  15. try
  16. {
  17. while (true)
  18. {
  19. String item;
  20. System.out.println(name + " acquiring " +
  21. (item = pool.getItem()));
  22. Thread.sleep(200 +
  23. (int) (Math.random() * 100));
  24. System.out.println(name + " putting back " +
  25. item);
  26. pool.putItem(item);
  27. }
  28. }
  29. catch (InterruptedException ie)
  30. {
  31. System.out.println(name + "interrupted");
  32. }
  33. }};
  34. ExecutorService[] executors =
  35. new ExecutorService[Pool.MAX_AVAILABLE + 1];
  36. for (int i = 0; i < executors.length; i++)
  37. {
  38. executors[i] = Executors.newSingleThreadExecutor();
  39. executors[i].execute(r);
  40. }
  41. }
  42. }
  43. final class Pool
  44. {
  45. public static final int MAX_AVAILABLE = 10;
  46. private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
  47. private final String[] items;
  48. private final boolean[] used = new boolean[MAX_AVAILABLE];
  49. Pool() {
  50. items = new String[MAX_AVAILABLE];
  51. for (int i = 0; i < items.length; i++)
  52. items[i] = "I" + i;
  53. }
  54. String getItem() throws InterruptedException
  55. {
  56. available.acquire();
  57. return getNextAvailableItem();
  58. }
  59. void putItem(String item)
  60. {
  61. if (markAsUnused(item))
  62. available.release();
  63. }
  64. private synchronized String getNextAvailableItem()
  65. {
  66. for (int i = 0; i < MAX_AVAILABLE; ++i)
  67. {
  68. if (!used[i])
  69. {
  70. used[i] = true;
  71. return items[i];
  72. }
  73. }
  74. return null; // not reached
  75. }
  76. private synchronized boolean markAsUnused(String item)
  77. {
  78. for (int i = 0; i < MAX_AVAILABLE; ++i)
  79. {
  80. if (item == items[i])
  81. {
  82. if (used[i])
  83. {
  84. used[i] = false;
  85. return true;
  86. }
  87. else
  88. return false;
  89. }
  90. }
  91. return false;
  92. }
  93. }

清单6-4中默认主线程创建了一个资源池、一个反复获取和归还资源的runnable和一组executors。每个执行者都会执行这一runnable。

类Pool的String getItem()以及void putItem(String item)方法获取和归还基于字符串的资源。在通过getItem()获取一个条目之前,调用线程必须从信号量中获取一个许可证,这样才能保证这个条目可用。当线程处理完这个条目,它会去调用putItem(String)方法,该方法会将此条目归还到池中,之后会释放一个针对这个信号量的许可。如此便可让其它的线程获取到那个条目。

当acquire()方法被调用的时候并不会持有同步锁,因为那样会阻止条目被归还到池中。不过,String getNextAvailableItem()和boolean markAsUnused(String item)方法会同步地去维持池的一致性。(这个信号量把对资源池限制访问和对需要维护资源池一致性的同步操作都独立封装起来了。)

照下面编译清单6-4:

  1. javac SemaphoreDemo.java

运行程序:

  1. java SemaphoreDemo

你应该能观测到类似下列输出的开始部分(消息的顺序可能略有出入):

  1. pool-1-thread-1 acquiring I0
  2. pool-2-thread-1 acquiring I1
  3. pool-3-thread-1 acquiring I2
  4. pool-5-thread-1 acquiring I3
  5. pool-7-thread-1 acquiring I4
  6. pool-4-thread-1 acquiring I5
  7. pool-6-thread-1 acquiring I6
  8. pool-9-thread-1 acquiring I7
  9. pool-8-thread-1 acquiring I8
  10. pool-10-thread-1 acquiring I9
  11. pool-9-thread-1 putting back I7
  12. pool-2-thread-1 putting back I1
  13. pool-11-thread-1 acquiring I7
  14. pool-9-thread-1 acquiring I1
  15. pool-8-thread-1 putting back I8
  16. pool-2-thread-1 acquiring I8
  17. pool-5-thread-1 putting back I3
  18. pool-8-thread-1 acquiring I3
  19. pool-4-thread-1 putting back I5
  20. pool-5-thread-1 acquiring I5
  21. pool-6-thread-1 putting back I6
  22. pool-4-thread-1 acquiring I6
  23. pool-1-thread-1 putting back I0
  24. pool-6-thread-1 acquiring I0
  25. pool-7-thread-1 putting back I4
  26. pool-1-thread-1 acquiring I4
  27. pool-10-thread-1 putting back I9
  28. pool-7-thread-1 acquiring I9
  29. pool-3-thread-1 putting back I2
  30. pool-10-thread-1 acquiring I2

Phaser

phaser是一个更加弹性的同步屏障。和同步屏障一样,一个phaser使得一组线程在屏障上等待,在最后一条线程到达之后,这些线程得以继续执行。phaser也提供barrier action的等价操作。和同步屏障协调固定数目的线程不同,一个phaser能够协调不定数目的线程,这些线程可以在任何时候注册。为了实现这一功能,phaser使用了phase和phase值。

phase是phaser当前的状态,同时这一状态被一个整型的phase值所确定。当最后一条注册的线程到达phaser屏障,phaser提前抵达phase并且给其加1。类java.util.concurrent实现了phaser。由于Javadoc中已经详细描述了这个类,我这里只会给出一些构造函数和方法的描述:

清单6-5基于Phaser Javadoc的第一个例子,提供了一个关于phaser同步器的示例。

清单6-5 使用一个Phaser来控制一个一次性的动作,该动作作用于可变数量的线程上。

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Phaser;
  5. public class PhaserDemo
  6. {
  7. public static void main(String[] args)
  8. {
  9. List<Runnable> tasks = new ArrayList<>();
  10. tasks.add(() -> System.out.printf("%s running at %d%n",
  11. Thread.currentThread().getName(),
  12. System.currentTimeMillis()));
  13. tasks.add(() -> System.out.printf("%s running at %d%n",
  14. Thread.currentThread().getName(),
  15. System.currentTimeMillis()));
  16. runTasks(tasks);
  17. }
  18. static void runTasks(List<Runnable> tasks)
  19. {
  20. final Phaser phaser = new Phaser(1); // "1" (register self)
  21. // create and start threads
  22. for (final Runnable task: tasks)
  23. {
  24. phaser.register();
  25. Runnable r = () ->
  26. {
  27. try
  28. {
  29. Thread.sleep(50 + (int) (Math.random() * 300));
  30. }
  31. catch (InterruptedException ie)
  32. {
  33. System.out.println("interrupted thread");
  34. }
  35. phaser.arriveAndAwaitAdvance(); // await the ...
  36. // creation of ...
  37. // all tasks
  38. task.run();
  39. };
  40. Executors.newSingleThreadExecutor().execute(r);
  41. }
  42. // allow threads to start and deregister self
  43. phaser.arriveAndDeregister();
  44. }
  45. }

清单6-5中默认主线程创建了一对runnable任务,每个任务报告它们自己开始运行的时间(以毫秒记)。在创建一个Phaser的实例之后,运行这些任务并等待全部任务到达该屏障。

照下面这样编译清单6-5:

  1. javac PhaserDemo.java

运行程序:

  1. java PhaserDemo

你应该能观测到类似下列的输出(这个应用程序不会终止——按下Ctrl+C或者相应的触发键来终止程序):

  1. pool-1-thread-1 running at 1445806012709
  2. pool-2-thread-1 running at 1445806012712

就和你期待的倒计时门闩的行为一样,尽管由于Thread.sleep()方法的存在,一条线程可能延迟349毫秒之多,但是全部线程(在这个例子当中)都在同样的时间开始运行。

注释掉phaser.arriveAndAwaitAdvance(); // await the ...,你现在应该能观测到这些线程彻底不同时运行了,显示如下:

  1. pool-2-thread-1 running at 1445806212870
  2. pool-1-thread-1 running at 1445806213013

练习

  1. 定义同步器。
  2. 描述倒计时门闩(CountDownLatch)的行为。
  3. 当调用CountDownLatch的void countDown()方法且计数变为0时会发生什么呢?
  4. 描述同步屏障的行为。
  5. 判断对错:任意线程等待时屏障被重置或者在调用await()方法时屏障被打破,则CyclicBarrier的int await()方法返回-1。
  6. 描述一个交换器的行为。
  7. Exchanger的V exchange(V x)方法完成了什么操作?
  8. 描述一个信号量的行为。
  9. 识别两种类型的信号量。
  10. 描述一个phaser的行为。
  11. Phaser的int register()方法返回什么?
  12. 清单3-2(第三章中)展示了一个改良版的PC程序。重新使用类Semaphore处理同步来创建这一应用程序。

小结

Java提供了synchronized关键字对临界区进行线程同步访问。由于基于synchronized很难正确地编写同步代码,所以并发工具类提供了高级的同步器(控制通用同步方法的类)。

倒计时门闩会导致一条或多条线程在“门口”一直等待直到另一条线程打开这扇门,线程才得以继续运行。它是由一个计数变量和两个操作组成,这两个操作分别是“导致一条线程等待直到计数变为0”以及“递减计数变量”。

同步屏障使得一组线程一直等待彼此抵达一个公共的屏障点。由于在等待线程被释放之后可以被复用,所以它是可回收使用的。这个同步器在涉及固定规模的且不时等待彼此的线程中很有用。

同步屏障允许一组线程彼此互相等待,直到抵达到某个公共的屏障点。因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。该同步器对于这类数量固定,并且互相之间必须不时等待彼此的多线程应用很有用。

信号量维护了一组许可证(permit)来约束访问被限制资源的线程数。当没有可用的许可证时,线程的获取尝试会一直阻塞直到其它的线程释放一个许可证。

phaser是一个更加弹性的同步屏障。和同步屏障一样,一个phaser使得一组线程在屏障上等待,在最后一条线程到达之后,这些线程得以继续执行。phaser也提供barrier action的等价操作。和同步屏障协调固定数目的线程不同,一个phaser能够协调不定数目的线程,这些线程可以在任何时候注册。为了实现这一功能,phaser使用了phase和phase值。

第7章会涉及锁框架。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注