[关闭]
@lambeta 2016-09-20T12:12:29.000000Z 字数 12078 阅读 344

第5章

translation


第二部分

并发工具类

第5章

并发工具类和Executor框架

前4章主要关注Java对底层线程操作的支持。这章开始关注Java对高级线程操作的支持,即并发工具类。可以认为使用并发工具类和使用高级语言编写应用程序类似,而使用较底层的线程操作则和使用汇编语言编写应用程序类似。简单介绍完这些实用工具之后,我会带你浏览一遍Executor。后3章会涵盖其它多种并发工具类。

并发工具类简介

Java对底层线程操作的支持使得你可以创建多线程应用程序。这些程序比对应的单线程程序提供了更好的性能和响应能力。尽管如此,依然存在一些问题:

为了解决这些问题,Java 5引入了并发工具类,这是一个由强大且易扩展的高性能线程工具类组成的框架,包含了线程池和阻塞队列。该框架由下列包中的多种类型组成。

这个框架也将long nanoTime()方法引入了类java.lang.System当中,这可以让你做相对时间测量的时候访问纳秒级别的时间资源。
并发工具类可以被分类为executors, synchronizers以及锁框架等。我会在接下来的章节探讨executors并在后续章节讨论其余类别。

探索Executors

线程API使得你可以通过诸如new java.lang.Thread(new RunnableTask()).start();的方式执行任务。这些表达式将多个任务的提交及它们的执行机制(在当前线程、新线程或者随机地从线程池【组】中选择的线程)紧紧地绑在一起。


注意 一个任务是这样一种对象,它的类实现了java.lang.Runnable(一个可运行的任务)或者java.util.concurrent.Callable接口(一个可被调用的任务)。稍后我会介绍更多有关于Callable的内容。


并发工具类使用高级的executors替代了底层的线程操作执行任务。一个executor就是这样一种对象,它的类直接或间接地实现了java.util.concurrent.Executor接口,这样便从任务执行机制中解耦了任务的提交操作。


注意 Executor框架使用接口从任务执行中解耦任务的提交操作和Collections框架使用接口从它们的实现中解耦lists, sets, queues以及maps类似。解耦会产生更易维护的具备伸缩性的代码。


Executor声明了一个单独的void execute(Runnable runnable)方法,该方法会在将来的某个时间点执行这个名为runnable的可运行任务。当runnable是null时,execute()方法会抛出java.lang.NullPointerException;当无法执行runnable时,会抛出java.util.concurrent.RejectedExecutionException。


注意 当一个executor正在关闭并且不想接受新的任务时,RejectedExecutionException会被抛出来。当然,当executor没有足够的空间存储这一任务(或许executor使用有限的阻塞队列来存储任务且队列已满——我会在第8章讨论阻塞队列)时,这种异常也会被抛出来。


下面的例子展示了前面提及的new Thread(new RunnableTask()).start();表达式在Executor中的等价物:

  1. Executor executor = ...; // ... represents some executor creation
  2. executor.execute(new RunnableTask());

尽管Executor很好用,但这个接口还是在很多方面有所限制:

这些限制被java.util.concurrent.ExecutorService接口解决了,这个接口扩展Executor接口并且其实现就是一个典型的线程池。表5-1描述了ExecutorService的方法。

表 5-1. ExecutorService的方法

  1. boolean awaitTermination(long timeout, TimeUnit unit)

在一条关闭请求之后,不论是任务全部都已完成、timeout(以unit的时间单元衡量)超时还是当前线程被中断的哪一条先发生,都会一直阻塞(等待)。如果executor已经中止,返回true;若在中止之前超时,返回false。

  1. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

执行任务集合中每一个callable任务并当所有的任务执行完成后(即任务要么正常结束,要么抛出异常),返回一个java.util.List的java.util.concurrent.Future的实例(本章稍后讨论),这些实例会持有任务的状态和结果。这组Future和任务迭代器返回的任务序列具有相同的顺序。当任务处于等待状态却被中断时,该方法抛出InterruptedException,同时未完成的任务也会被取消;若tasks或者其任意元素为null,抛出NullPointerException;当没有任何任务可以被调度执行时,会抛出RejectedExecutionException.

  1. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

执行任务集合中每一个callable任务并当所有的任务执行完成后(即任务要么正常结束,要么抛出异常),或者timeout(以unit的时间单元衡量)超时后,返回一组java.util.concurrent.Future的实例,这些实例会持有任务的状态和结果。超时未完成的任务都会被取消。这组Futures和任务迭代器返回的任务序列具有相同的顺序。当任务处于等待(未完成的任务会被取消)却被中断时,该方法抛出InterruptedException. 若tasks和其任意元素或unit为null,抛出NullPointerException;当没有任何任务可以被调度执行时,会抛出RejectedExecutionException.

  1. <T> T invokeAny(Collection<? extends Callable<T>> tasks)

执行给出的tasks, 如果有任务成功完成执行(换句话说,没有抛出异常)就会返回其结果。只要有正常或者异常的返回,未完成执行的任务都会被取消。当任务处于等待却被中断时,该方法抛出InterruptedException. 若tasks或其任意元素为null,抛出NullPointerException;当没有任何任务可以被调度执行时,会抛出RejectedExecutionException.

  1. <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)

执行给出的tasks, 如果有任务在timeout(以unit时间单元衡量)超时之前成功完成执行(没有抛出异常)就会返回其结果——在超时后还没有完成的任务会被取消。该方法在等待时被中断会抛出InterruptedException. 若tasks和其任意元素或者unit为null,则抛出NullPointerException. 当tasks是空时,抛出IllegalArgumentException. 若在任意任务完成之前超时,抛出java.util.concurrent.TimeoutException; 当没有任务成功完成,则抛出ExecutionException; 当没有任何任务可以被调度执行时,会抛出RejectedExecutionException.

  1. boolean isShutdown()

当其executor已经终止时,返回true;否则,返回false。

  1. boolean isTerminated()

若紧接着终止之后所有的任务都完成了,那么返回true;否则,返回false。 该方法绝不会在调用shutdown()或者shutdownNow()之前返回true。

  1. void shutdown()

有序地开始关闭之前提交执行的任务,不妥不再接受新的任务。在该executor关闭之后调用此方法没有任何作用。该方法不会等待之前提交的任务执行完毕。当需要等待时,使用awaitTermination()方法。

  1. List<Runnable> shutdownNow()

尝试停止所有活跃的执行线程,挂起等待任务的进程,并且返回一组正在等待执行的任务。这里并未保证采用最优的方式停止处理活跃的执行线程。举个例子,通常的实现都会通过Thread.interrupt()方法取消任务,所以任何没能响应中断的任务可能再也不会终止掉。

  1. <T> Future<T> submit(Callable<T> task)

提交一个callable任务来执行,同时返回一个代表任务等待结果的Future实例。这个Future实例的get()方法在成功执行完成后返回任务的结果。当任务无法调度执行,该方法抛出RejectedExecutionException。若task是空,则抛出NullPointerException。如果你想在等待任务完成的时候立即阻塞,可以使用这一形式的构造result = exec.submit(aCallable).get();

  1. Future<?> submit(Runnable task)

提交一个runnable任务来执行,同时返回一个代表此任务的Future实例。该Future的get方法在成功完成之后返回null。

  1. <T> Future<T> submit(Runnable task, T result)

提交一个runnable任务来执行,同时返回一个Future实例,其get()方法在成功完成的情况下返回结果的值。当任务无法调度执行,该方法抛出RejectedExecutionException,若task为空,则抛出NullPointerException.

表5-1引用了枚举类java.util.concurrent.TimeUnit,它代表给定时间单元上的时间跨度:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 以及 SECONDS。 另外,TimeUnit还声明了一些方法来做跨单元的转化(比如 long toHours(long duration)),以及在这些单元之上操作定时和延时(比如 void sleep(long timeout))。

表5-1也涉及了callable任务。不同于Runnable,其void run()方法既不能返回值也不能抛出受检异常,Callble的V call()方法不仅可以返回值还可以抛出受检的异常,因为它声明了一个throws Exception子句。

最后,表5-1涉及到Future接口,它代表着一种异步计算的结果。这个结果之所以被称为future是因为它通常要到未来的某个时刻才有效。Futrue的泛型是Futrue<V>,它提供了取消任务、返回任务的结果以及判断任务是否已经结束的方法。表 5-2描述了Future的方法。

表 5-2. Future接口的方法

  1. boolean cancel(boolean mayInterruptIfRunning)

尝试取消此任务的执行,并且当该任务被取消返回true;否则,返回false(任务可能在cancel()方法被调用之前已经正常结束)。当任务已经完成、取消或者由于其它原因无法取消时,取消尝试就会失败。如果取消成功并且该任务还没有启动,这个任务就应该不会再执行了。如果这个任务已经启动,在尝试停止这个任务的过程中,参数mayInterruptIfRunning会决定是(true)否(false)应该中断运行任务的线程。方法返回之后,后续调用isDone()方法都会返回true。当cancel()方法返回true后,isCancelled()方法总会返回true。

  1. V get()

如果需要,则等待此任务完成之后返回结果。当该任务先于此方法被调用被取消则抛出java.util.concurrent.CancellationException,若该任务抛出异常则抛出ExecutionException,并且若当前线程在等待中被中断,则抛出InterruptedException。

  1. V get(long timeout, TimeUnit unit)

最多等待timeout时间单元(被unit所指定)直到此任务完成执行,之后返回结果(如果可用的话)。当任务在调用此方法之前被取消则抛出java.util.concurrent.CancellationException,若该任务中途出现异常,则抛出ExcutionException;若当前线程在等待中被中断,则抛出InterruptedException;当此方法的timeout超时(等待超时时间),则抛出TimeoutException。

  1. boolean isCancelled()

当此任务在正常完成之前被取消了返回true;否则,返回false。

  1. boolean isDone()

当此任务完成返回true;否则,返回false。此方法在正常的中断、出现异常或者取消等场景下都会返回true。

假设你准备写一个应用程序,其图形用户界面可以允许用户输入一个单词。在用户输入单词之后,这个应用程序传递单词到几个在线的辞典并获取每个辞典的条目。这些条目随后会被呈现给用户。
因为在线访问会比较慢,且用户界面需要保持响应(或许用户想要结束应用程序),你把“获取单词条目”的任务分摊到一个executor上,它在一个单独的线程中运行此任务。下面的例子使用了ExecutorService, Callable和Future来完成这个目标。

  1. ExecutorService executor = ...; // ... represents some executor creation
  2. Future<String[]> taskFuture =
  3. executor.submit(new Callable<String[]>()
  4. {
  5. @Override
  6. public String[] call()
  7. {
  8. String[] entries = ...;
  9. // Access online dictionaries
  10. // with search word and populate
  11. // entries with their resulting
  12. // entries.
  13. return entries;
  14. }
  15. });
  16. // Do stuff.
  17. String entries = taskFuture.get();

在以某些方式获得一个executor之后(你会很快学得),例子中的线程将一个callable任务提交给了这个executor。submit()方法会立即返回一个Future对象的引用,这个引用控制任务的执行以及结果访问。此线程最终会调用该对象的get()方法获取结果。


注意
接口java.util.concurrent.ScheduledExecutorService继承自接口ExecutorService。它代表了一种能够让你调度任务运行一次或者在指定延迟之后周期性执行的exectuor。


尽管你可以实现自己的Exectuor、ExecutorService以及ScheduledExecutorService(例如class DirectExecutor implements Executor { @Override public void execute(Runnable r) { r.run(); } } —— 在调用线程中直接运行executor),不过这还有一个更简单的选项:java.util. concurrent.Executors。


小贴士
如果你想要创建自己的ExecutorService实现,配合使用java.util.concurrent.AbstractExecutorService和java.util.concurrent.FutureTask classes会很有帮助。


Executors 工具类声明了几个类方法用来返回多种ExecutorService和ScheduledExecutorService实现的实例(以及其他类型的实例)。这个类的静态方法完成了下列任务:

举个例子:static ExecutorService newFixedThreadPool(int nThreads)方法创建了一个线程池复用固定数量的线程操作一个共享的无限队列。至多有nThreads个线程同时处理任务。如果额外的任务在所有线程都活跃时被提交了,它们会在队列中等待一条可用线程。
如果在executor停止之前,任何线程因为执行当中的失败而终止,而后续任务的执行又必要的时候一条新的线程会取代其位置。这些在线程池中的线程会一直存在直到executor被显式地关闭。当传递0或者负数到nThreads的时候,该方法会抛出IllegalArgumentException.


注意 线程池用来消除不得不为每条提交任务创建一条新线程的开销。线程创建并不廉价,必须创建大量线程可能会严重影响应用程序的性能。


你通常应该会在文件和网络输入、输出上下文中使用executor、runnable、callable以及future。而大量的计算操作给你提供了另外一种使用这些类型的场景。举个例子,清单5-1在欧拉数e(2.71828...)的计算上下文中使用了一个executor、callable以及一个future。

清单5-1 计算欧拉数e

  1. import java.math.BigDecimal;
  2. import java.math.MathContext;
  3. import java.math.RoundingMode;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.Future;
  9. public class CalculateE
  10. {
  11. final static int LASTITER = 17;
  12. public static void main(String[] args)
  13. {
  14. ExecutorService executor = Executors.newFixedThreadPool(1);
  15. Callable<BigDecimal> callable;
  16. callable = new Callable<BigDecimal>()
  17. {
  18. @Override
  19. public BigDecimal call()
  20. {
  21. MathContext mc = new MathContext(100, RoundingMode.HALF_UP);
  22. BigDecimal result = BigDecimal.ZERO;
  23. for (int i = 0; i <= LASTITER; i++)
  24. {
  25. BigDecimal factorial =
  26. factorial(new BigDecimal(i));
  27. BigDecimal res = BigDecimal.ONE.divide(factorial, mc);
  28. result = result.add(res);
  29. }
  30. return result;
  31. }
  32. public BigDecimal factorial(BigDecimal n)
  33. {
  34. if (n.equals(BigDecimal.ZERO))
  35. return BigDecimal.ONE;
  36. else
  37. return n.multiply(factorial(n.
  38. subtract(BigDecimal.ONE)));
  39. }
  40. };
  41. Future<BigDecimal> taskFuture = executor.submit(callable);
  42. try
  43. {
  44. while (!taskFuture.isDone())
  45. System.out.println("waiting");
  46. System.out.println(taskFuture.get());
  47. }
  48. catch(ExecutionException ee)
  49. {
  50. System.err.println("task threw an exception");
  51. System.err.println(ee);
  52. }
  53. catch(InterruptedException ie)
  54. {
  55. System.err.println("interrupted while waiting");
  56. }
  57. executor.shutdownNow();
  58. }
  59. }

默认主线程首先执行main()方法通过调用Executors的newFixedThreadPool()方法获取一个executor。之后初始化了一个实现了Callable接口的匿名类并且将这个任务提交给executor,在返回中接收一个Future的实例。

在提交任务之后,线程通常会做一些其他工作直到它获取到此项任务的结果。我让主线程在该Future实例的isDone()方法返回true之前,不断地打印一条等待中的消息以模拟这种工作(在现实的应用程序中,我不会使用循环)。此时,主线程调用实例的get()方法来获取待会儿输出的结果。最后,主线程关闭了这个executor.


留意 在executor完成之后关闭之是十分重要的。否则,此应用程序可能不会终止。上面的executor通过调用shutdownNow()来完成这一工作。(你也可以使用shutdown()方法。)


这个callable的call()方法通过求数学幂级数e = 1 / 0! + 1 / 1! + 1 / 2! + ....来计算e。这一幂级数可以通过1 / n!求和获得,n取值从0到无穷(!代表阶乘)。

call()首先初始化java.math.MathContext以封装一个精度(小数的位数)和舍位模式。我选择100作为e精度的上限,同时选择了HALF_UP作为舍位模式。


小贴士 增加精度和LASTITER的值以求更长的收敛时间和更为近似的e。


call()接下来会把java.math.BigDecimal类型的result局部变量实例化成BigDecimal.ZERO。之后进入一个循环计算一个阶乘,用BigDecimal.ONE除以这个阶乘,并且把除好的结果加到result中去。
divide()方法把MathContext实例作为其第二个参数来提供舍位信息。(如果我指定0作为这个math context的精度,同时出现一个无限小数部分【举个例子:除法所得商无法精确地表达——0.3333333...】,程序会抛出java.lang.ArithmeticException,警告调用者这一事实 —— 商无法精确表达。executor会重新把这个异常作为ExecutionException抛出)。

照着下面的样子编译清单5-1:

  1. javac CalculateE.java

运行程序:

  1. java CalculateE

你应该能观测到和下面类似的输出(你很可能观测到更多的waiting消息):

  1. waiting
  2. waiting
  3. waiting
  4. waiting
  5. waiting
  6. 2.71828182845904507051604779584860506117897963525103269890073500406522504250
  7. 4843314055887974344245741730039454062711

练习

下面的练习被设计来验证你对第5章内容的理解程度:
1. 并发工具类是什么?
2. 明确并发工具类型位于哪些包下面。
3. 定义任务。
4. 定义executor。
5. 明确Executor接口的局限。
6. 如何克服Executor的局限。
7. Runnable的run()方法和Callable的call()方法存在什么区别?
8. 判断对错:你可以从Runnable的run()方法中抛出受检和非受检的异常,但是只能从Callable的call()方法中抛出非受检的异常。
9. 定义future。
10. 描述类Executors的newFixedThreadPool()方法。
11. 使用Executors和ExecutorService来重构下面的CountingThreads应用程序。

  1. public class CountingThreads
  2. {
  3. public static void main(String[] args)
  4. {
  5. Runnable r = new Runnable()
  6. {
  7. @Override
  8. public void run()
  9. {
  10. String name = Thread.currentThread().
  11. getName();
  12. int count = 0;
  13. while (true)
  14. System.out.println(name + ": " +
  15. count++);
  16. }};
  17. Thread thdA = new Thread(r);
  18. Thread thdB = new Thread(r);
  19. thdA.start();
  20. thdB.start();
  21. }
  22. }

12. 当你执行前面练习中CountingThreads应用程序时,你会观测到诸如pool-1-thread-1所标识的线程输出。请修改CountingThreads让你可以观测名称A和B。提示:你需要使用ThreadFactory。

小结

Java对底层线程操作的支持使得你可以创建多线程应用程序。这些程序比对应的单线程程序提供了更好的性能和响应能力。不过,影响应用程序可扩展性的性能及其他问题促使Java 5引入了并发工具类。

这个并发工具类将多种类型组织进了三个包中:java.util.concurrent、java.util.concurrent.atomic以及java.util.concurrent.locks。基础类型如executors、线程池、并发hashmaps以及其他高级的并发构造被存储在java.util.concurrent;支持在单个变量上进行无锁、线程安全编程的类存储在java.util.concurrent.atomic;在某些条件上获取锁或者执行等待的类型存储在java.util.concurrent.locks。

Executor框架从任务执行中解耦了任务的提交操作,它包含Executor、ExecutorService以及ScheduledExecutorService。你可以通过调用类Executors中任意一个工具方法来获取一个executor。另外,Executors和callable及future相关联。

第6章会涉及同步器。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注