@lambeta
2016-09-20T12:10:48.000000Z
字数 19164
阅读 311
translation
Java提供了synchronized关键字对临界区进行线程同步访问。由于基于synchronized很难正确地编写同步代码,所以并发工具类提供了高级的同步器(控制通用同步方法的类)。本章中,我会介绍倒计时门闩(countdown latch)、同步屏障(cyclic barrier)、交换器(exchanger)、信号量(semaphore)以及phaser同步器。
倒计时门闩会导致一条或多条线程在“门口”一直等待直到另一条线程打开这扇门,线程才得以继续运行。它是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程等待直到计数变为0”以及“递减计数变量”。
类java.util.concurrent.CountDownLatch实现了倒计时门闩(countdown latch)同步器。你可以通过调用这个类的构造器CountDownLatch(int count)指定计数个数来初始化一个CountDownLatch的实例。当count的值是负数,该方法会抛出java.lang.IllegalArgumentException。
类CountDownLatch也提供了下列方法:
你会经常使用一个倒计时门闩来保证多条线程几乎同时开始工作。举个例子,请看清单6-1:
import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CountDownLatchDemo{final static int NTHREADS = 3;public static void main(String[] args){final CountDownLatch startSignal = new CountDownLatch(1);final CountDownLatch doneSignal = new CountDownLatch(NTHREADS);Runnable r = new Runnable(){@Overridepublic void run(){try {report("entered run()");startSignal.await(); // wait until told to ...report("doing work"); // ... proceedThread.sleep((int) (Math.random() * 1000));doneSignal.countDown(); // reduce count on which// main thread is ...}catch (InterruptedException ie){System.err.println(ie);}}void report(String s){System.out.println(System.currentTimeMillis() +": " + Thread.currentThread() +": " + s);}};ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);for (int i = 0; i < NTHREADS; i++)executor.execute(r);try{System.out.println("main thread doing something");Thread.sleep(1000); // sleep for 1 secondstartSignal.countDown(); // let all threads proceedSystem.out.println("main thread doing something else");doneSignal.await(); // wait for all threads to finishexecutor.shutdownNow();}catch (InterruptedException ie){System.err.println(ie);}}}
清单6-1中的默认主线程首先创建了一对倒计时门闩。这个startSignal门闩会在默认主线程就绪之前禁止任何工作线程执行,而doneSignal门闩会使得默认主线程等待所有的工作线程全部结束执行。
默认主线程接下来创建了一个带有run()方法的runnable,它会被后续创建的工作线程执行。
run()方法首先输出一条消息,然后调用startSignal的await()方法在继续执行之前会一直等待直到门闩的计数变为0。继续执行时,run()方法输出一条表示工作正在进行的消息并且挑一个随机的时间(从0到999毫秒)睡眠用于模拟正在做的工作。
到这里,run()调用了doneSignal的countDown()方法递减了门闩的计数。一旦计数降到0,等着这个信号的默认主线程就会继续执行,关闭executor和中断应用程序。
创建好runnable之后,从包含NTHREADS条线程的线程池获取一个executor,然后在这个executor上连续调用NTHREADS次execute()方法,依次把runnable传入线程池中。这一动作会启动线程运行run()方法。
接下来,默认主线程输出一条消息并且通过睡眠1秒钟的方式模拟执行其它的工作(得以让所有工作线程依次执行run()方法,进而调用startSignal.await()),然后调用startSingal的countDown()方法使工作线程开始运行,再输出一条消息表示主线程正在做一些别的事情,接着调用doneSignal的await()方法在它可以继续执行之前等待倒计时门闩的计数降至0。
照着下面编译清单6-1:
javac CountDownLatchDemo.java
运行程序:
java CountDownLatchDemo
你应该能观测到类似下列的输出(消息顺序可能略有出入):
main thread doing something1445802274931: Thread[pool-1-thread-2,5,main]: entered run()1445802274931: Thread[pool-1-thread-3,5,main]: entered run()1445802274931: Thread[pool-1-thread-1,5,main]: entered run()main thread doing something else1445802275931: Thread[pool-1-thread-2,5,main]: doing work1445802275931: Thread[pool-1-thread-3,5,main]: doing work1445802275933: Thread[pool-1-thread-1,5,main]: doing work
同步屏障允许一组线程彼此互相等待,直到抵达到某个公共的屏障点。因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。该同步器对于这类数量固定,并且互相之间必须不时等待彼此的多线程应用很有用。
类java.util.concurrent.CyclicBarrier实现了同步屏障。通过调用这个类的CyclicBarrier(int parties)构造函数,你可以初始化一个包含指定parties(拥有共同执行目标的多条线程)数目的CyclicBarrier实例。如果parties的值小于1,构造函数就会抛出IllegalArgumentException。
另外,你可以调用CyclicBarrier(int parties, Runnable barrierAction)构造函数来初始化一个同步屏障,包含指定parties数目的线程以及一旦跨越屏障就会执行的barrierAction。换句话说,当parties - 1条线程处于等待当中并且又有一条线程到达,这条到达的线程会执行barrierAction,然后所有的线程继续执行。这个runnable适用于在任意线程继续执行之前更新共享状态。当传递给parties的值小于1,该构造函数抛出IllegalArgumentException。(前面的构造函数会调用这个构造函数,若把barrierAction设为null,那么当跨越屏障时,就没有runnable可供执行)。
CyclicBarrier也提供了下列方法:
int await():强制调用线程一直等待直到所有的parties都已经在同步屏障上调用了await()方法。当调用线程自己或其它等待线程被中断、有线程在等待中超时或者有线程在同步屏障之上调用reset()方法,该调用线程就会停止等待。如果调用线程在进入方法时设置过中断的状态或者等待时被中断,该方法就会抛出InterruptedException并且其中断状态会被清除。当有线程正在等待时该同步屏障被重置了(通过 reset()方法)以及当同步屏障在await()方法被调用或任意线程正在等待时同步屏障被打破,该方法就会抛出java.util.concurrent.BrokenBarrierException。一旦有线程在等待时被中断,其它所有等待中的线程都会抛出BrokenBarrierException并且同步屏障也会被设置为打破状态。如果调用线程是最后一条到达的线程并且构造函数中提供了一个非空的barrierAction,这条线程就会在允许其它线程继续执行之前率先执行这个runnable。该方法会返回调用线程的的到达索引,getParties() - 1代表第一条到达的线程,0代表最后一条到达的线程。
int await(long timeout, TimeUnit unit):除了让你指定调用线程愿意等待的时长之外,该方法等同于之前的方法。当线程在等待中超时,该方法会抛出java.util.concurrent.TimeoutException。
int getNumberWaiting():返回当前在同步屏障上等待的线程数目。该方法对于调试以及断言十分有用。
int getParties():返回需要跨越同步屏障的线程数目。
boolean isBroken():当一条或者多条线程由于在同步屏障创建或上次重置之后,中断或超时从而打破同步屏障,又或者因为一个异常导致barrier action失败时,返回true;否则返回false。
void reset():把同步屏障重置到其原始状态。如果此时任意的线程等待在这个同步屏障上,就会抛出一个BrokenBarrierException。注意在由于某些原因发生的跳出操作之后进行重置是非常难以实现的。线程需要通过一些其它的方式重新同步并挑选一条线程来进行重置操作。所以,最好还是给后续的使用创建一个新的同步屏障。
同步屏障在并行分解的场合下很有用。在这里长时间的任务被分割成多个子任务,它们单独的结果之后会被合并到整个任务的结果当中。同步屏障的Javadoc给出了清单6-2的示例代码。
清单6-2 使用同步屏障把一个任务分解成多个子任务
import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo{public static void main(String[] args){float[][] matrix = new float[3][3];int counter = 0;for (int row = 0; row < matrix.length; row++)for (int col = 0; col < matrix[0].length; col++)matrix[row][col] = counter++;dump(matrix);System.out.println();Solver solver = new Solver(matrix);System.out.println();dump(matrix);}static void dump(float[][] matrix){for (int row = 0; row < matrix.length; row++){for (int col = 0; col < matrix[0].length; col++)System.out.print(matrix[row][col] + " ");System.out.println();}}}class Solver{final int N;final float[][] data;final CyclicBarrier barrier;class Worker implements Runnable{int myRow;boolean done = false;Worker(int row){myRow = row;}boolean done(){return done;}void processRow(int myRow){System.out.println("Processing row: " + myRow);for (int i = 0; i < N; i++)data[myRow][i] *= 10;done = true;}@Overridepublic void run(){while (!done()){processRow(myRow);try {barrier.await();}catch (InterruptedException ie){return;}catch (BrokenBarrierException bbe){return;}}}}public Solver(float[][] matrix){data = matrix;N = matrix.length;barrier = new CyclicBarrier(N,new Runnable(){@Overridepublic void run(){mergeRows();}});for (int i = 0; i < N; ++i)new Thread(new Worker(i)).start();waitUntilDone();}void mergeRows(){System.out.println("merging");synchronized("abc"){"abc".notify();}}void waitUntilDone(){synchronized("abc"){try {System.out.println("main thread waiting");"abc".wait();System.out.println("main thread notified");}catch (InterruptedException ie){System.out.println("main thread interrupted");}}}}
清单6-2中的默认主线程首先创建了一个浮点数方阵并把这个方阵导出到标准输出流当中。主线程然后初始化了类Solver,这个类为每一行分别创建一条线程进行计算。更改之后的矩阵随后也被导出。
Solver提供了一个构造函数,它接收矩阵参数并用属性data指向这个矩阵同时用属性N指向矩阵行的数目。这个构造函数之后创建了一个拥有N条线程的同步屏障,并且又负责把所有的行合并到矩阵。最后,该构造函数分别创建工作线程,负责处理矩阵中单一的行。之后构造函数等待所有工作线程结束。
工作线程的run()方法反复在指定的行上调用processRow()方法直到done()方法返回true。(在这个例子当中)在processRow()执行过一次之后done()就会返回true。在processRow()返回之后,也就意味着矩阵行已经被处理了,当前的工作线程就会于同步屏障之上掉所有await()方法,它也就无法执行下去了。
某些时候,所有的工作线程都已经调用await()方法了。当处理矩阵最后一行的这条最后的线程调用了await()方法,它就会触发屏障动作来将所有处理过的矩阵行合并到最终的矩阵当中。在这个例子当中,合并操作不是必须的,但在更复杂的例子里或许是必要的。
最后mergeRow()方法进行的任务是去通知调用Solver构造函数的主线程。主线程等待在String对象"abc"所关联的监听器上,而调用notify()足以唤醒这唯一一条在此监听器上等待的线程。
照下面编译清单 6-2:
javac CyclicBarrierDemo.java
运行程序:
java CyclicBarrierDemo
你应该能观测到类似下列的输出(消息的顺序可能略有出入):
0.0 1.0 2.03.0 4.0 5.06.0 7.0 8.0main thread waitingProcessing row: 0Processing row: 1Processing row: 2mergingmain thread notified0.0 10.0 20.030.0 40.0 50.060.0 70.0 80.0
交换器提供了一个线程彼此之间能够交换对象的同步点。每条线程都会往这个交换器的exchange()方法传入一些对象,匹配伙伴线程,同时接收伙伴线程中的对象作为返回值。交换器在诸如遗传算法( http://en.wikipedia.org/wiki/Genetic_algorithm )和管道设计的应用程序中会很有用。
泛型类java.util.concurrent.Exchanger< V >实现了交换器。你可以通过调用Exchanger()构造函数来初始化一个交换器,然后调用下列方法之一进行一次交换:
V exchange(V x):在这个交互点上等待其他线程到达(除非调用线程被中断了),之后将所给对象传入其中,接收其它线程的对象作为返回。如果其它的线程已经等在了交换点上,为了线程调度它会从中恢复并且会接收调用线程所传入的对象。当前线程会立即返回,接收其它线程传入交换器中的对象。当调用线程被中断了,该方法会抛出InterruptedException。
V exchange(V x, long timeout, TimeUnit unit):除了让你指定调用线程愿意等待的时长之外,该方法等同于之前的方法。当线程在等待中超时,该方法会抛出TimeoutException。
清单6-3扩展了这个Exchanger的Javadoc中重复地填充和清空Exchanger缓冲区的例子。
清单6-3 使用一个交换器来交换缓冲区
import java.util.ArrayList;import java.util.List;import java.util.concurrent.Exchanger;public class ExchangerDemo{final static Exchanger<DataBuffer> exchanger =new Exchanger<DataBuffer>();final static DataBuffer initialEmptyBuffer = new DataBuffer();final static DataBuffer initialFullBuffer = new DataBuffer("I");public static void main(String[] args){class FillingLoop implements Runnable{int count = 0;@Overridepublic void run(){DataBuffer currentBuffer = initialEmptyBuffer;try{while (true){addToBuffer(currentBuffer);if (currentBuffer.isFull()){System.out.println("filling thread wants to exchange");currentBuffer = exchanger.exchange(currentBuffer);System.out.println("filling thread receives exchange");}}}catch (InterruptedException ie){System.out.println("filling thread interrupted");}}void addToBuffer(DataBuffer buffer){String item = "NI" + count++;System.out.println("Adding: " + item);buffer.add(item);}}class EmptyingLoop implements Runnable{@Overridepublic void run(){DataBuffer currentBuffer = initialFullBuffer;try{while (true){takeFromBuffer(currentBuffer);if (currentBuffer.isEmpty()){System.out.println("emptying thread wants to " +"exchange");currentBuffer = exchanger.exchange(currentBuffer);System.out.println("emptying thread receives " + "exchange");}}}catch (InterruptedException ie){System.out.println("emptying thread interrupted");}}void takeFromBuffer(DataBuffer buffer){System.out.println("taking: " + buffer.remove());}}new Thread(new EmptyingLoop()).start();new Thread(new FillingLoop()).start();}}class DataBuffer{private final static int MAXITEMS = 10;private final List<String> items = new ArrayList<>();DataBuffer(){}DataBuffer(String prefix){for (int i = 0; i < MAXITEMS; i++){String item = prefix + i;System.out.printf("Adding %s%n", item);items.add(item);}}synchronized void add(String s){if (!isFull())items.add(s);}synchronized boolean isEmpty(){return items.size() == 0;}synchronized boolean isFull(){return items.size() == MAXITEMS;}synchronized String remove(){if (!isEmpty())return items.remove(0);return null;}}
清单6-3的默认主线程通过静态属性初始化创建了一个交换器以及一对缓冲区。之后,它初始化本地的类EmptyingLoop和FillingLoop并且将这些runnable传递到新的线程实例当中,这些实例随后会被启动。(也可以使用executors)每个runnables的run()方法进入一个无限循环,反复地往它的缓冲中添加或者删除。当缓冲区满了或者空了,这个交换器会用来交换这些缓冲,持续地添加或清空。
照下面编译清单6-3:
javac ExchangerDemo.java
运行程序:
java ExchangerDemo
你应该能观测到类似下列输出的开始部分(消息的顺序可能略有出入)
Adding I0Adding I1Adding I2Adding I3Adding I4Adding I5Adding I6Adding I7Adding I8Adding I9taking: I0taking: I1taking: I2taking: I3taking: I4taking: I5taking: I6taking: I7taking: I8taking: I9emptying thread wants to exchangeAdding: NI0Adding: NI1Adding: NI2Adding: NI3Adding: NI4Adding: NI5Adding: NI6Adding: NI7Adding: NI8Adding: NI9filling thread wants to exchangefilling thread receives exchangeemptying thread receives exchangeAdding: NI10taking: NI0Adding: NI11taking: NI1Adding: NI12
信号量维护了一组许可证(permit)来约束访问被限制资源的线程数。当没有可用的许可证时,线程的获取尝试会一直阻塞直到其它的线程释放一个许可证。
注意
当前的值可以被递增加1的信号量被称作计数信号量。而当前的值只能取0或1的信号量则被称为二进制信号量或者互斥信号量。在这两种场景中,当前的值都不能为负。
类java.util.concurrent.Semaphore实现了这一同步器,同时将信号量概念化成一个维护一组许可证的对象。你可以调用Semaphore(int permits)构造函数初始化一个信号量,其中permits指定了可用许可证的数量。这个信号量的公平策略被设置成false(不公平)。或者,你也可以调用Semaphore(int permits, boolean fair)构造函数把信号量的公平策略设置成true(公平)。
当公平策略设置为false,信号量不会保证线程获取许可证的顺序。特别地,barging是允许的。也就是说,即便线程已经在等待,调用了acquire()方法的新线程还是能先于这条线程被分配许可证。逻辑上,新线程把自己放到了等待线程队列的队首了。当公平策略被设置为true,信号量就能保证调用acquire()方法的任意线程能以方法被调用处理的顺序获取许可证(先进先出,FIFO)。因为FIFO顺序需要应用到这些方法中指定的内部执行点上,很可能一条线程先于另一条线程调用acquire()方法但是却后于那条线程抵达顺序点,从方法中返回也类似。当然,不限时的tryAcquire()方法不会遵循公平策略的设定——它们会获取任意可用的许可证。
一般来讲,信号量通常被用来控制资源访问,它应当被初始化成公平的,从而保证不会有任何线程在访问资源时饿死。当针对其它同步控制使用信号量的时候,不公平策略带来的吞吐量好处是超过公平策略的。
信号量也提供下列方法:
int drainPermits():获取并返回立即可用的许可证的数量。
int getQueueLength():返回等待获取许可证的大致线程数。由于线程数在该方法遍历内部数据结构的时候可能会动态改变,所以返回的值只能是估算值。该方法被设计用来监控系统状态而不是用来做同步控制。
boolean isFair():返回公平性设置(公平返回true,不公平返回false)。
void release():释放一个许可证,将其返回给信号量。可用许可证的数目增加1。如果任何线程正在尝试获取一个许可证,被选到的线程就会被给予刚刚释放的许可证。那条线程就会因为线程调度而被重新启用。
清单6-4基于Semaphore Javadoc展现的“控制对于一组条目的访问”例子进行了扩展。
清单6-4 使用计数信号量去控制对一组条目的访问
import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;import java.util.concurrent.Semaphore;public class SemaphoreDemo{public static void main(String[] args){final Pool pool = new Pool();Runnable r = new Runnable(){@Overridepublic void run(){String name = Thread.currentThread().getName();try{while (true){String item;System.out.println(name + " acquiring " +(item = pool.getItem()));Thread.sleep(200 +(int) (Math.random() * 100));System.out.println(name + " putting back " +item);pool.putItem(item);}}catch (InterruptedException ie){System.out.println(name + "interrupted");}}};ExecutorService[] executors =new ExecutorService[Pool.MAX_AVAILABLE + 1];for (int i = 0; i < executors.length; i++){executors[i] = Executors.newSingleThreadExecutor();executors[i].execute(r);}}}final class Pool{public static final int MAX_AVAILABLE = 10;private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);private final String[] items;private final boolean[] used = new boolean[MAX_AVAILABLE];Pool() {items = new String[MAX_AVAILABLE];for (int i = 0; i < items.length; i++)items[i] = "I" + i;}String getItem() throws InterruptedException{available.acquire();return getNextAvailableItem();}void putItem(String item){if (markAsUnused(item))available.release();}private synchronized String getNextAvailableItem(){for (int i = 0; i < MAX_AVAILABLE; ++i){if (!used[i]){used[i] = true;return items[i];}}return null; // not reached}private synchronized boolean markAsUnused(String item){for (int i = 0; i < MAX_AVAILABLE; ++i){if (item == items[i]){if (used[i]){used[i] = false;return true;}elsereturn false;}}return false;}}
清单6-4中默认主线程创建了一个资源池、一个反复获取和归还资源的runnable和一组executors。每个执行者都会执行这一runnable。
类Pool的String getItem()以及void putItem(String item)方法获取和归还基于字符串的资源。在通过getItem()获取一个条目之前,调用线程必须从信号量中获取一个许可证,这样才能保证这个条目可用。当线程处理完这个条目,它会去调用putItem(String)方法,该方法会将此条目归还到池中,之后会释放一个针对这个信号量的许可。如此便可让其它的线程获取到那个条目。
当acquire()方法被调用的时候并不会持有同步锁,因为那样会阻止条目被归还到池中。不过,String getNextAvailableItem()和boolean markAsUnused(String item)方法会同步地去维持池的一致性。(这个信号量把对资源池限制访问和对需要维护资源池一致性的同步操作都独立封装起来了。)
照下面编译清单6-4:
javac SemaphoreDemo.java
运行程序:
java SemaphoreDemo
你应该能观测到类似下列输出的开始部分(消息的顺序可能略有出入):
pool-1-thread-1 acquiring I0pool-2-thread-1 acquiring I1pool-3-thread-1 acquiring I2pool-5-thread-1 acquiring I3pool-7-thread-1 acquiring I4pool-4-thread-1 acquiring I5pool-6-thread-1 acquiring I6pool-9-thread-1 acquiring I7pool-8-thread-1 acquiring I8pool-10-thread-1 acquiring I9pool-9-thread-1 putting back I7pool-2-thread-1 putting back I1pool-11-thread-1 acquiring I7pool-9-thread-1 acquiring I1pool-8-thread-1 putting back I8pool-2-thread-1 acquiring I8pool-5-thread-1 putting back I3pool-8-thread-1 acquiring I3pool-4-thread-1 putting back I5pool-5-thread-1 acquiring I5pool-6-thread-1 putting back I6pool-4-thread-1 acquiring I6pool-1-thread-1 putting back I0pool-6-thread-1 acquiring I0pool-7-thread-1 putting back I4pool-1-thread-1 acquiring I4pool-10-thread-1 putting back I9pool-7-thread-1 acquiring I9pool-3-thread-1 putting back I2pool-10-thread-1 acquiring I2
phaser是一个更加弹性的同步屏障。和同步屏障一样,一个phaser使得一组线程在屏障上等待,在最后一条线程到达之后,这些线程得以继续执行。phaser也提供barrier action的等价操作。和同步屏障协调固定数目的线程不同,一个phaser能够协调不定数目的线程,这些线程可以在任何时候注册。为了实现这一功能,phaser使用了phase和phase值。
phase是phaser当前的状态,同时这一状态被一个整型的phase值所确定。当最后一条注册的线程到达phaser屏障,phaser提前抵达phase并且给其加1。类java.util.concurrent实现了phaser。由于Javadoc中已经详细描述了这个类,我这里只会给出一些构造函数和方法的描述:
Phaser(int parties)构造函数创建了一个phaser,一开始就协调parties数目的线程(还没有抵达phaser屏障),同时其phase初始为0。
int register()方法往这个phaser中添加一条尚未抵达的线程,同时返回phase值作抵达分类用。这个值被称为抵达phase值。
int arriveAndAwaitAdvance()方法记录到达并等待phaser前进(在其它线程已经到达之后开始)。它会返回抵达phase值。
int arriveAndDeregister()方法抵达此phaser,同时从中注销而不会等待其它线程到达,由此减少了未来phase上需要前进的线程数量。
清单6-5基于Phaser Javadoc的第一个例子,提供了一个关于phaser同步器的示例。
清单6-5 使用一个Phaser来控制一个一次性的动作,该动作作用于可变数量的线程上。
import java.util.ArrayList;import java.util.List;import java.util.concurrent.Executors;import java.util.concurrent.Phaser;public class PhaserDemo{public static void main(String[] args){List<Runnable> tasks = new ArrayList<>();tasks.add(() -> System.out.printf("%s running at %d%n",Thread.currentThread().getName(),System.currentTimeMillis()));tasks.add(() -> System.out.printf("%s running at %d%n",Thread.currentThread().getName(),System.currentTimeMillis()));runTasks(tasks);}static void runTasks(List<Runnable> tasks){final Phaser phaser = new Phaser(1); // "1" (register self)// create and start threadsfor (final Runnable task: tasks){phaser.register();Runnable r = () ->{try{Thread.sleep(50 + (int) (Math.random() * 300));}catch (InterruptedException ie){System.out.println("interrupted thread");}phaser.arriveAndAwaitAdvance(); // await the ...// creation of ...// all taskstask.run();};Executors.newSingleThreadExecutor().execute(r);}// allow threads to start and deregister selfphaser.arriveAndDeregister();}}
清单6-5中默认主线程创建了一对runnable任务,每个任务报告它们自己开始运行的时间(以毫秒记)。在创建一个Phaser的实例之后,运行这些任务并等待全部任务到达该屏障。
照下面这样编译清单6-5:
javac PhaserDemo.java
运行程序:
java PhaserDemo
你应该能观测到类似下列的输出(这个应用程序不会终止——按下Ctrl+C或者相应的触发键来终止程序):
pool-1-thread-1 running at 1445806012709pool-2-thread-1 running at 1445806012712
就和你期待的倒计时门闩的行为一样,尽管由于Thread.sleep()方法的存在,一条线程可能延迟349毫秒之多,但是全部线程(在这个例子当中)都在同样的时间开始运行。
注释掉phaser.arriveAndAwaitAdvance(); // await the ...,你现在应该能观测到这些线程彻底不同时运行了,显示如下:
pool-2-thread-1 running at 1445806212870pool-1-thread-1 running at 1445806213013
Java提供了synchronized关键字对临界区进行线程同步访问。由于基于synchronized很难正确地编写同步代码,所以并发工具类提供了高级的同步器(控制通用同步方法的类)。
倒计时门闩会导致一条或多条线程在“门口”一直等待直到另一条线程打开这扇门,线程才得以继续运行。它是由一个计数变量和两个操作组成,这两个操作分别是“导致一条线程等待直到计数变为0”以及“递减计数变量”。
同步屏障使得一组线程一直等待彼此抵达一个公共的屏障点。由于在等待线程被释放之后可以被复用,所以它是可回收使用的。这个同步器在涉及固定规模的且不时等待彼此的线程中很有用。
同步屏障允许一组线程彼此互相等待,直到抵达到某个公共的屏障点。因为该屏障在等待线程被释放之后可以重用,所以称它为可循环使用的屏障。该同步器对于这类数量固定,并且互相之间必须不时等待彼此的多线程应用很有用。
信号量维护了一组许可证(permit)来约束访问被限制资源的线程数。当没有可用的许可证时,线程的获取尝试会一直阻塞直到其它的线程释放一个许可证。
phaser是一个更加弹性的同步屏障。和同步屏障一样,一个phaser使得一组线程在屏障上等待,在最后一条线程到达之后,这些线程得以继续执行。phaser也提供barrier action的等价操作。和同步屏障协调固定数目的线程不同,一个phaser能够协调不定数目的线程,这些线程可以在任何时候注册。为了实现这一功能,phaser使用了phase和phase值。
第7章会涉及锁框架。