[关闭]
@lambeta 2016-09-20T12:11:50.000000Z 字数 18388 阅读 334

第8章

translation


第8章

额外的并发工具类

第5章到第7章介绍了并发框架,executor(callable和future),同步器以及锁框架。本章中,我会通过介绍并发集合、原子变量、Fork/Join框架以及completion service来覆盖所有的并发工具类。


注意 由于时间有限,我无法覆盖completable future的内容。如果你对这个话题感兴趣,我推荐你看看Tomasz Nurkiewicz这篇非常优质的博客,题为“Java 8:CompletableFuture权威指南”,地址是http://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html


并发集合

Java的集合框架提供了位于java.util包下的诸多接口和类。其中接口包括了List,Set和Map。类包含了ArrayList、TreeSet以及HashMap。

ArrayList、TreeSet、HashMap以及其它实现了这些接口的类都不是线程安全的。不过你可以使用类java.util.Collections中的同步包装方法让它们变得安全。举个例子,你可以向Collections.synchronizedList()中传入一个ArrayList实例以获得一个线程安全的ArrayList变体。

尽管它们对于在多线程环境中简化代码是必要的,但是线程安全的集合仍然有很多问题:

并发工具类通过并发集合来应对这些问题。并发集合是具有并发性能和高扩展性面向集合的类型,它们存储在java.util.concurrent包中。面向集合的类返回了弱一致性的迭代器,这类迭代器具有如下属性:

下列列表提供了简短的面向并发的集合类型的样例,你可以在java.util.concurrent包中找到:

Oracle关于BlockingQueue、ArrayBlockingQueue以及其它面向并发集合类型的Javadoc都把这些类型标识成了集合框架的一部分。

使用BlockingQueue和ArrayBlockingQueue

BlockingQueue的Javadoc展示了生产者-消费者应用程序的核心,这比第3章中展示的同等的应用程序简单很多(请看清单3-1),因为它不需要处理同步。清单8-1在更高级的生产者-消费者的等价程序中使用了BlockingQueue及其实现ArrayBlockingQueue。

清单8-1 第3章PC应用程序阻塞队列的等价实现

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. public class PC
  6. {
  7. public static void main(String[] args)
  8. {
  9. final BlockingQueue<Character> bq;
  10. bq = new ArrayBlockingQueue<Character>(26);
  11. final ExecutorService executor = Executors.newFixedThreadPool(2);
  12. Runnable producer = () ->
  13. {
  14. for (char ch = 'A'; ch <= 'Z'; ch++)
  15. {
  16. try
  17. {
  18. bq.put(ch);
  19. System.out.printf("%c produced by " +
  20. "producer.%n", ch);
  21. }
  22. catch (InterruptedException ie)
  23. {
  24. }
  25. }
  26. };
  27. executor.execute(producer);
  28. Runnable consumer = () ->
  29. {
  30. char ch = '\0';
  31. do
  32. {
  33. try
  34. {
  35. ch = bq.take();
  36. System.out.printf("%c consumed by " +
  37. "consumer.%n", ch);
  38. }
  39. catch (InterruptedException ie)
  40. {
  41. }
  42. }
  43. while (ch != 'Z');
  44. executor.shutdownNow();
  45. };
  46. executor.execute(consumer);
  47. } }

清单8-1使用了BlockingQueue的put()和take()方法分别将一个对象放到阻塞队列中以及将一个对象从中移除。如果没有空间可以放置对象了,put()方法会被阻塞住,如果这个队列空了,take()方法也会阻塞住。
尽管BlockingQueue能够保证一个字符绝不会在它被生产之前就被消费,但是应用程序的输出仍然可能相反。举个例子,这儿有一次运行中的部分输出:

  1. Y consumed by consumer.
  2. Y produced by producer.
  3. Z consumed by consumer.
  4. Z produced by producer.

第3章清单3-2中的PC应用通过在setSharedChar()/System.out.println()和getSharedChar()/System.out. println()周围引入另外一层同步克服了输出顺序的错误。第7章的订单7-2的PC应用当中通过将这些方法放置在lock()/unlock()方法之间克服同样的错误。

深入学习ConcurrentHashMap

类ConcurrentHashMap和HashMap在行为上别无二致,但是被设计用于无需显式同步就可以工作在多线程的上下文。举个例子,你经常需要检查一个map中是否包含某个特定的值,当这个值不存在的时候,将它放进map中:

  1. if (!map.containsKey("some string-based key"))
  2. map.put("some string-based key", "some string-based value");

尽管这段代码很简单并且似乎可以胜任此项工作,但是它却不是线程安全的。在调用map.containsKey()和map.put()方法之间,其它线程可能插入了这个条目,很可能会被覆盖掉。为了消除这个竞态条件,你必须显式地同步这段代码,示例如下:

  1. synchronized(map)
  2. {
  3. if (!map.containsKey("some string-based key"))
  4. map.put("some string-based key", "some string-based value");
  5. }

这个方法的问题在于当检查键值是否存在以及不存在便添加条目到map时,你把整个map的读写操作都给锁住了。当多条线程同时访问这个map时,这样的锁会影响性能。

泛型的ConcurrentHashMap<V>类通过提供V putIfAbsent(K key, V value)方法来应对这个问题,当键值不存在时,它会把key/value条目放进map当中。这个方法等价于下列代码段,但具有更好的性能:

  1. synchronized(map)
  2. {
  3. if (!map.containsKey(key))
  4. return map.put(key, value);
  5. else
  6. return map.get(key);
  7. }

使用putIfAbsent()方法,原来的代码段就可以翻译成如下简单的代码段了:

  1. map.putIfAbsent("some string-based key", "some string-based value");

注意 Java 8添加了超过30个新方法改善了ConcurrentHashMap,通过聚合操作大量地支持了lambda表达式和Streams API。进行聚合操作的方法包括forEach()方法(forEach()、forEachKey()、forEachValue()以及forEachEntry())、查找方法(search()、searchKeys()、searchValues()以及searchEntries())以及化约方法(reduce()、reduceToDouble()、reduceToLong()等等),其它方法(如mappingCount()和newKeySet())也被加入其中。由于JDK 8的改变,ConcurrentHashMaps(以及从中构建的类)现在作为缓存更好用。改善缓存的变化反映在为键计算值的方法,对扫描(可能会逃逸)条目的改良以及对拥有大量元素的map更好的支持上。


原子变量

那些和对象监听器关联的内置锁一直以来都有性能不佳的问题。尽管性能已经有所提高,但是它们依旧是创建web服务器以及其他在高争用环境下要求更好的扩展及性能之类应用的瓶颈。

大量研究探寻在同步上下文中创建非阻塞的算法,这些算法可以从根本上提高性能。当多条线程争用同样的数据时,线程不会阻塞,所以这些算法提高了扩展性。当然,线程也不会遭遇死锁和其它活跃性问题。

Java 5通过引入包java.util.concurrent.atomic提供创建高效非阻塞的算法。根据这个包的JDK文档,java.util.concurrent.atomic提供了小型工具类,支持在单个变量上进行无锁及线程安全的操作。

包java.util.concurrent.atomic中的类把volatitle标注的数值、属性以及数组元素扩展成也能支持原子的有条件的更新,这样外部的同步不再需要了。换句话说,你可以在无需外部同步的情况下获取和volatile变量相关联的内存互斥语义。


注意 术语atomic和indivisible一般被认为是相同的,尽管原子是可以分割的。


包java.util.concurrent.atomic中一些类描述如下:

原子变量被用于实现计数器、序列生成器(例如java.util.concurrent.ThreadLocalRandom)以及其它构造。在线程高争用的环境中,这些构造要求互斥而不影响性能。举个例子,清单8-2中的ID类,其类方法getNextID()返回唯一的长整型标识符。

清单8-2 借助synchronized,以线程安全的方式返回唯一的标识符

  1. class ID {
  2. private static volatile long nextID = 1;
  3. static synchronized long getNextID()
  4. {
  5. return nextID++;
  6. }
  7. }

尽管这段代码确实被同步过(可见性也被保证了),但是在线程高争用的环境下synchronized关键字关联的内置锁会影响性能。甚至,诸如死锁之类的活跃性问题也会发生。清单8-3展示了如何用一个原子变量替换synchronized的方式来避免这些问题。

清单8-3 借助AtomicLong,以线程安全的方式返回唯一的标识符

  1. import java.util.concurrent.atomic.AtomicLong;
  2. class ID {
  3. private static AtomicLong nextID = new AtomicLong(1);
  4. static long getNextID()
  5. {
  6. return nextID.getAndIncrement();
  7. }
  8. }

在清单8-3中,我将nextID从long型转换成一个AtomicLong的实例,并初始化成1。同时,将getNextID()方法重构成了对AtomicLong的getAndIncrement()方法的调用,该方法会在一个不可分割的步骤里把AtomicLong实例的内部长整型递增1并返回之前的值。此时并不存在显式的同步。


注意 包java.util.concurrent.atomic包含了DoubleAccumulator、DoubleAdder、LongAccumulator以及LongAdder类,这些类解决了从多线程中维护单个计数、求和以及更新某些值的上下文中的扩展性问题。这些新类“从内部利用了争用-递减的技术相较于原子变量带来更大吞吐量的提升。这就使得在大多数应用程序当中放松原子性保证成为一种可接受的可能。”


理解原子的神秘之处

Java低级的同步机制,强制使用互斥(持有锁的线程保证一组变量被互斥地访问)以及可见性(被保护变量的更改对后续获取锁的其它线程可见),以下面的方式影响了硬件的使用和扩展能力:

尽管你可能认为volatile是同步的备选项,但事实并非如此。volatile变量只能解决可见性的问题,它们无法应用于安全地实现原子的读-改-写的序列。这个原子的序列对于实现线程安全的计数器以及其它需要互斥访问的实体是有必要的。不过,并发工具集(例如类java.util.concurrent.Semaphore)提供了一个负责性能提升的备选项,它就是compare-and-swap。

Compare-and-swap (CAS)是一个针对非抢占式微处理器的一条指定指令的宽泛术语,这条指令读取内存的位置,比较读到的值和期望的值,当读到的值和期望的值匹配时,就将新值存储到该内存位置;否则,什么事也不会发生。现代微处理器提供了多种CAS的变体。例如:Intel微处理器提供了cmpxchg家族的指令,而旧的PowerPC微处理器提供了等价的load-link(如lwarx)以及store-conditional(如stwcx)指令。

CAS支持原子的读-改-写序列。你通常会像下面这样使用CAS:
1. 从地址A读出值x。
2. 在x上进行一个多步计算,衍生出一个新的值y。
3. 使用CAS把A的值从x改成y。当操作这些步骤时,如果A的值没有发生改变,CAS就成功了。

为了理解CAS的好处,以清单8-2中的ID类为例,它返回一个唯一的标识符。由于这个类将getNextID()方法声明成了synchronized,高争用环境下的监听锁会导致过多的上下文切换,这样会阻碍所有的线程并且导致应用程序无法很好得扩展。

假设存在一个CAS类,它存储了基于整型值的value。更进一步,它提供了原子的方法int getValue()以返回值以及int compareAndSwap(int expectedValue, int newValue)以实现CAS。(此场景的背后,CAS依赖于Java本地接口[JNI]来访问特定微处理器的CAS指令。)

这个compareAndSwap()方法以原子的方式执行下列指令的序列:

  1. int readValue = value; // Obtain the stored value.
  2. if (readValue == expectedValue) // If stored value not modified ...
  3. value = newValue; // ... change to new value.
  4. return readValue; // Return value before a potential change.

清单8-4展示了新版的ID类,使用了CAS类以一种高性能的方式获取唯一的标识。(忘掉使用JNI导致的性能后果,假设我们可以直接访问特定微处理器的CAS指令。)

清单8-4 借由CAS,以一种线程安全的方式返回唯一的ID

  1. class ID {
  2. private static CAS value = new CAS(1);
  3. static long getNextID()
  4. {
  5. int curValue = value.getValue();
  6. while (value.compareAndSwap(curValue, curValue + 1) != curValue)
  7. curValue = value.getValue();
  8. return curValue + 1;
  9. }
  10. }

ID类封装了一个被实例化成整型值1的CAS实例,并且声明了一个getNextID()方法用于获取当前的标识符,之后从该实例中递增这个值。在获取该实例当前值之后,getNextID()会反复地调用compareAndSwap()方法直到curValue的值不再改变(被其它线程)。这个方法在返回前面的值之后就可以自由地改变curValue的值了。当不涉及锁、高争用环境下不会有过多的上下文切换时,性能就得以提高,并且代码也会更具扩展性。

作为CAS如何改善并发工具的例子,考虑java.util.concurrent.locks.ReentrantLock。这个类在高线程争用的环境下较之synchronized提供了更好的性能。为了提升性能,抽象类java.util.concurrent.locks.AbstractQueuedSynchronizer的子类管理了ReentrantLock的同步方式。依次地,这个类利用了未经文档化的类sun.misc.Unsafe及其compareAndSwapInt() CAS方法。

原子变量的类也利用了CAS。甚至,它们提供的方法具有下列形式:

  1. boolean compareAndSet(expectedValue, updateValue)

这个方法(在不同的类中参数的类型不同)在变量当前等于expectedValue时,会原子性地给它设置updateValue,成功返回true。

Fork/Join框架

让代码执行得更快些总归是必要的。过去,这一需求总是通过提升微处理器的速度或者提供多个处理器得以满足。不过,2003前后,微处理器的速度由于自然限制停止了增长。为了补偿,处理器制造商开始往他们的处理器中添加多个处理内核,借助大量的并行来提升速度。


注意 并行指的是从多个处理器和核心的一些组合中同时运行线程。相反地,并发则是一种更为宽泛形式的并行,并发中,线程可以同时运行或者通过上下文切换看上去像是同时运行,也就是虚拟并行。一些人更进一步将并发刻画成程序或操作系统的一个属性,而将并行看作同时运行多条线程的运行时行为。


Java提供底层的线程特性以及线程池之类的高级并发工具类来支持并发。但是,并发的问题在于它无法最大化利用可用的处理器、核心的资源。举个例子,假设你创建了一种排序算法将一个数组分成两个部分,安排两条线程分别排序每个部分,在线程都完成之后合并结果。

假设每条线程在不同的处理器上运行。由于不同数量的元素会在数组的每部分上重新排序,很可能一条线程先于其它线程完成但在合并发生前必须等待。这一场景下,一个处理器资源就被浪费掉了。

这个问题(和相关的代码冗余难以阅读的问题)可以通过递归地把任务拆分成子任务然后组合结果的方式解决。这些任务会并行执行并且几乎在同一时间完成(如果不在同一时刻),它们的结果会被合并、传递到上一层子任务的栈上。这样几乎没有任何处理器时间会被浪费在等待上,并且这些递归的代码也更简洁、(大部分)更容易理解。Java提供了Fork/Join框架来实现这一场景。

Fork/Join框架由特定的executor service和线程池构成。executor service可以运行任务,并且这个任务会被分解成较小的任务,它们从线程池中被fork(被不同的线程执行)出来,在join(即它的所有子任务都完成了)之前会一直等待。

Fork/Join使用了任务窃取来最小化线程的争用和开销。线程池中的每条工作线程都有自己的双端工作队列并且会将新任务放到这个队列中。它从队列的头部读取任务。如果队列是空的,工作线程就尝试从另一个队列的末尾获取一个任务。窃取操作不会很频繁,因为工作线程会采用后进先出(LIFO)的顺序将任务放入它们的队列当中,同时工作项的规模会随着问题分割成子问题而变小。你一开始把任务交给一个中心的工作线程,之后它会持续将这个任务分解成更小的任务。最终所有的工作线程都只会涉及很少量的同步操作。

Fork/Join框架绝大部分由java.util.concurrent包中的ForkJoinPool、ForkJoinTask、ForkJoinWorkerThread、RecursiveAction、RecursiveTask以及CountedCompleter类组成:

Java文档提供基于RecursiveAction的任务(例如排序)以及基于RecursiveTask的任务(例如计算Fibonacci数列)。你也可以使用RecursiveAction去完成矩阵乘法(请看http:// en.wikipedia.org/wiki/Matrix_multiplication)。举个例子,假设你已经创建了清单8-5中的Matrix类来表示由指定行和列数组成的矩阵。

清单8-5 表示二维表的类

  1. public class Matrix
  2. {
  3. private final int[][] matrix;
  4. public Matrix(int nrows, int ncols)
  5. {
  6. matrix = new int[nrows][ncols];
  7. }
  8. public int getCols()
  9. {
  10. return matrix[0].length;
  11. }
  12. public int getRows()
  13. {
  14. return matrix.length;
  15. }
  16. public int getValue(int row, int col)
  17. {
  18. return matrix[row][col];
  19. }
  20. public void setValue(int row, int col, int value)
  21. {
  22. matrix[row][col] = value;
  23. }
  24. }

清单8-6示范了以一种单线程的方式做两个矩阵的乘法。

清单8-6 由标准的矩阵乘法算法来做两个矩阵的相乘

  1. public class MatMult
  2. {
  3. public static void main(String[] args)
  4. {
  5. Matrix a = new Matrix(1, 3);
  6. a.setValue(0, 0, 1); // | 1 2 3 |
  7. a.setValue(0, 1, 2);
  8. a.setValue(0, 2, 3);
  9. dump(a);
  10. Matrix b = new Matrix(3, 2);
  11. b.setValue(0, 0, 4); // | 4 7 |
  12. b.setValue(1, 0, 5); // | 5 8 |
  13. b.setValue(2, 0, 6); // | 6 9 |
  14. b.setValue(0, 1, 7);
  15. b.setValue(1, 1, 8);
  16. b.setValue(2, 1, 9);
  17. dump(b);
  18. dump(multiply(a, b));
  19. }
  20. public static void dump(Matrix m)
  21. {
  22. for (int i = 0; i < m.getRows(); i++)
  23. {
  24. for (int j = 0; j < m.getCols(); j++)
  25. System.out.printf("%d ", m.getValue(i, j));
  26. System.out.println();
  27. }
  28. System.out.println();
  29. }
  30. public static Matrix multiply(Matrix a, Matrix b)
  31. {
  32. if (a.getCols() != b.getRows())
  33. throw new IllegalArgumentException("rows/columns mismatch");
  34. Matrix result = new Matrix(a.getRows(), b.getCols());
  35. for (int i = 0; i < a.getRows(); i++)
  36. for (int j = 0; j < b.getCols(); j++)
  37. for (int k = 0; k < a.getCols(); k++)
  38. result.setValue(i, j, result.getValue(i, j) +
  39. a.getValue(i, k) * b.getValue(k, j));
  40. return result;
  41. }
  42. }

清单8-6中的MatMult类声明了一个multiply()方法表示矩阵的乘法。在验证了矩阵算法的基本条件,即第一个矩阵(a)的列数等于第二个矩阵(b)的行数之后,multiply()创建了一个结果矩阵并进入一系列的内嵌循环来进行乘法操作。

这些循环的本质如下:对于a中每一行,将行中的每一个值和a中对应列的每一个值相乘。然后将乘法的结果相加,同时把整个结果存储到a中行索引(i)和b中列索引(j)指定的位置上。

编译清单8-6和清单8-5,它们必须在同一个目录,如下:

  1. javac MultMat.java

运行程序:

  1. java MultMat

你应该能观测到如下的输出,即1行3列的矩阵乘以3行2列的矩阵得到一个1行2列的矩阵:

  1. 1 2 3
  2. 4 7
  3. 5 8
  4. 6 9
  5. 32 50

计算机科学家将这类的算法分类为O(n*n*n),读作“大O n的立方”或者“近似n的立方”。这种表示法是一种分类算法性能的抽象方式(不会陷入诸如微处理器速度这样的细节当中)。一个O(n*n*n)分类暗示着很差的性能,并且随矩阵的大小成倍增长性能更差。

性能可以通过把每个行乘列的乘法任务分配到独立的类似线程的实体当中的方式得以提升。清单8-7向你展示了如何在Fork/Join框架的上下文中完成这一场景。

清单8-7 借助Fork/Join框架将两个矩阵的实例相乘

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.ForkJoinPool;
  4. import java.util.concurrent.RecursiveAction;
  5. public class MatMult extends RecursiveAction
  6. {
  7. private final Matrix a, b, c;
  8. private final int row;
  9. public MatMult(Matrix a, Matrix b, Matrix c)
  10. {
  11. this(a, b, c, -1);
  12. }
  13. public MatMult(Matrix a, Matrix b, Matrix c, int row)
  14. {
  15. if (a.getCols() != b.getRows())
  16. throw new IllegalArgumentException("rows/columns mismatch");
  17. this.a = a;
  18. this.b = b;
  19. this.c = c;
  20. this.row = row;
  21. }
  22. @Override
  23. public void compute()
  24. {
  25. if (row == -1)
  26. {
  27. List<MatMult> tasks = new ArrayList<>();
  28. for (int row = 0; row < a.getRows(); row++)
  29. tasks.add(new MatMult(a, b, c, row));
  30. invokeAll(tasks);
  31. }
  32. else
  33. multiplyRowByColumn(a, b, c, row);
  34. }
  35. public static void multiplyRowByColumn(Matrix a, Matrix b, Matrix c, int row)
  36. {
  37. for (int j = 0; j < b.getCols(); j++)
  38. for (int k = 0; k < a.getCols(); k++)
  39. c.setValue(row, j, c.getValue(row, j) + a.getValue(row, k) * b.getValue(k, j));
  40. }
  41. public static void dump(Matrix m)
  42. {
  43. for (int i = 0; i < m.getRows(); i++)
  44. {
  45. for (int j = 0; j < m.getCols(); j++)
  46. System.out.print(m.getValue(i, j) + " ");
  47. System.out.println();
  48. }
  49. System.out.println();
  50. }
  51. public static void main(String[] args)
  52. {
  53. Matrix a = new Matrix(2, 3);
  54. a.setValue(0, 0, 1); // | 1 2 3 |
  55. a.setValue(0, 1, 2); // | 4 5 6 |
  56. a.setValue(0, 2, 3);
  57. a.setValue(1, 0, 4);
  58. a.setValue(1, 1, 5);
  59. a.setValue(1, 2, 6);
  60. dump(a);
  61. Matrix b = new Matrix(3, 2);
  62. b.setValue(0, 0, 7); // | 7 1 |
  63. b.setValue(1, 0, 8); // | 8 2 |
  64. b.setValue(2, 0, 9); // | 9 3 |
  65. b.setValue(0, 1, 1);
  66. b.setValue(1, 1, 2);
  67. b.setValue(2, 1, 3);
  68. dump(b);
  69. Matrix c = new Matrix(2, 2);
  70. ForkJoinPool pool = new ForkJoinPool();
  71. pool.invoke(new MatMult(a, b, c));
  72. dump(c);
  73. }
  74. }

清单8-7中的MatMult类继承自RecursiveAction。为了完成有意义的工作,RecursiveAction的void compute()方法被重写了。


注意 尽管compute()方法通常会将一个任务递归地分割成多个子任务,但是我选来做乘法任务(为了言简意赅)确实稍有不同。


创建矩阵a和b之后,清单8-7中的main()方法创建了矩阵c,同时初始化了ForkJoinPool。之后它初始化了MatMult,将这三个Matrix的实例作为参数传递到MatMult(Matrix a, Matrix b, Matrix c)构造函数当中,并且调用了ForkJoinPool的T invoke(ForkJoinTask< T > task)方法开始运行初始化任务。这个方法一直不会返回直到初始化任务和其所有子任务都已经完成。

当compute()最开始被调用的时候(row等于-1),它创建了一个MatMult任务的列表并且把这个列表传递到了RecursiveAction的Collection<T> invokeAll(Collection<T> tasks)方法(从ForkJoinTask中继承)。这个方法fork了所有列表中的任务,并开始执行它们。之后会一直等待直到invokeAll()方法返回(这也会join全部任务),也即boolean isDone()方法(也是从ForkJoinTask中继承的)对于每个任务都返回true。

注意这个task.add(new MatMult(a, b, c, row));方法的调用。这一调用将指定row值赋给了一个MatMult的实例。当invokeAll()方法被调用了,每个任务的compute()方法会被调用并且检测到row被赋予不同的值(不同于-1)。之后它会针对指定row执行multiplyRowByColumn(a, b, c, row);方法。

编译清单8-7(javac MatMult.java),运行结果程序(java MatMult)。你应该能观测到如下输出:

  1. 1 2 3
  2. 4 5 6
  3. 7 1
  4. 8 2
  5. 9 3
  6. 50 14
  7. 122 32

Completion Service

一个completion service就是java.util.concurrent.CompletionService<V>接口的实现,用于从已完成任务(消费者)结果的消费中解耦新的异步任务(生产者)的生产。V是这个任务返回结果的类型。

生产者通过调用某个submit()方法提交一个任务执行(通过一个工作线程):其中有个方法接收一个callable参数,而另一个方法接收一个runnable参数和一个任务完成的返回值。每个方法都会返回一个Future<V>的实例代表该任务的未完成状态。之后你可以调用poll()方法来轮训这个任务的完成状态或者调用这个阻塞式的take()方法(译者注:获得结果)。

消费者通过调用take()方法获取一个已完成的任务。这个方法在任务已经完成之前会一直阻塞。之后它会返回一个Future<V>对象来表示这一完成的任务,调用Future<V>的get()方法即可获取结果。

连同CompletionService<V>一起,Java7引入了java.util.concurrent.ExecutorCompletionService<V>类,这个类提供executor用于支持任务执行。它保证当提交过的任务完成时,这些任务会被放到一个可被take()方法访问的队列中。

为了演示CompletionService和ExecutorCompletionService,我会重新做一遍第5章呈现的计算欧拉数的程序。清单8-8中的源代码表示一个新的应用程序,它提交了两个callable任务以不同的精度计算数字。

清单8-8 借助Completion Service计算欧拉数

  1. import java.math.BigDecimal;
  2. import java.math.MathContext;
  3. import java.math.RoundingMode;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.CompletionService;
  6. import java.util.concurrent.ExecutorCompletionService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Future;
  10. public class CSDemo
  11. {
  12. public static void main(String[] args) throws Exception
  13. {
  14. ExecutorService es = Executors.newFixedThreadPool(10);
  15. CompletionService<BigDecimal> cs =
  16. new ExecutorCompletionService<BigDecimal>(es);
  17. cs.submit(new CalculateE(17));
  18. cs.submit(new CalculateE(170));
  19. Future<BigDecimal> result = cs.take();
  20. System.out.println(result.get());
  21. System.out.println();
  22. result = cs.take();
  23. System.out.println(result.get());
  24. es.shutdown();
  25. }
  26. }
  27. class CalculateE implements Callable<BigDecimal>
  28. {
  29. final int lastIter;
  30. public CalculateE(int lastIter)
  31. {
  32. this.lastIter = lastIter;
  33. }
  34. @Override
  35. public BigDecimal call()
  36. {
  37. MathContext mc = new MathContext(100, RoundingMode.HALF_UP);
  38. BigDecimal result = BigDecimal.ZERO;
  39. for (int i = 0; i <= lastIter; i++)
  40. {
  41. BigDecimal factorial = factorial(new BigDecimal(i));
  42. BigDecimal res = BigDecimal.ONE.divide(factorial, mc);
  43. result = result.add(res);
  44. }
  45. return result;
  46. }
  47. private BigDecimal factorial(BigDecimal n)
  48. {
  49. if (n.equals(BigDecimal.ZERO))
  50. return BigDecimal.ONE;
  51. else
  52. return n.multiply(factorial(n.subtract(BigDecimal.ONE)));
  53. }
  54. }

清单8-8展示了两个类:CSDemo和CalculateE。CSDemo驱动程序,而CalculateE则描述了欧拉数的计算任务。

CSDemo的main()方法首先创建了一个executor service,以供后续执行任务。之后创建了一个completion service用于完成任务。两个计算任务随后被提交到了这个completion service,每个任务都会异步地执行。对于每个任务,调用这个completion service的take()方法会返回任务的future对象,future的get()方法被用于获取任务的结果以供后续输出。

CalculateE包含的代码和第5章中展示的代码几乎相同(请看清单5-1)。仅有的不同在于将一个LASTITER常量改成了一个lastIter变量,此变量用于记录最后一次执行的迭代(以及决定这个数字小数部分的精度)。

照下面编译清单8-8:

  1. javac CSDemo.java

运行程序:

  1. java CSDemo

你应该能观测到下列输出:

  1. 2.71828182845904507051604779584860506117897963525103269890073500406522504250
  2. 4843314055887974344245741730039454062711
  3. 2.71828182845904523536028747135266249775724709369995957496696762772407663035
  4. 3547594571382178525166427463896116281654124813048729865380308305425562838245
  5. 9134600326751445819115604942105262868564884769196304284703491677706848122126
  6. 6648385500451288419298517722688532167535748956289403478802971332967547449493
  7. 7583500554228384631452841986384050112497204406928225548432766806207414980593
  8. 2978161481951711991448146506

注意 如果你想知道executor service和completion service之间的区别,可以这样考虑,使用executor service,写完提交任务的代码之后,你需要写一些代码去高效地获取任务的结果;使用completion service,这个工作就更加自动化了。另一种方式就是观察它们的构造,executor service为任务提供了一个输入队列和多条工作线程,而completion service则为任务提供了输入队列、工作线程以及一个输出队列来存储任务的结果。


练习

下面的练习被设计来验证你对第8章内容的理解程度:
1. 确认线程安全集合的两个问题。
2. 定义并发集合。
3. 什么是一个弱一致性的迭代器?
4. 描述BlockingQueue接口。
5. 描述ConcurrentMap接口。
6. 描述ArrayBlockingQueue、LinkedBlockingQueue和BlockingQueue实现类。
7. 判断对错:面向并发的集合类型是集合框架的一部分。
8. 描述ConcurrentHashMap类。
9. 使用ConcurrentHashMap,你该如何检查是否一个map包含了特定的值,并且当这个值不存在的时候,无需外部同步,把这个值放到map中。
10. 定义原子变量。
11. 类AtomicIntegerArray描述了什么?
12. 判断对错:volatile支持原子的读-改-写序列。
13. 通过使用并发工具类能够改善了哪些方面的性能?
14. 描述Fork/Join框架。
15. 确认构成Fork/Join框架的主要类型。
16. 为了通过RecursiveAction完成有意义的工作,你应该敷写哪个方法?
17. 定义completion service。
18. 如何使用一个completion service?
19. 如何通过一个completion service来运行任务?
20. 将下列的表达式转换成它们对应的原子变量等价物:

  1. int total = ++counter;
  2. int total = counter--;

小结

本章通过介绍并发集合、原子变量、Fork/Join框架以及completion service来覆盖所有的并发工具类。

存储在java.util.concurrent包中的并发集合是具有并发性能和高扩展性面向集合的类型,它们克服了ConcurrentModificationException以及线程安全的集合带来的性能问题。

原子变量是封装了单个变量,并且支持在单个变量上进行无锁及线程安全的操作的实例。如:AtomicInteger。

Fork/Join框架由特定的executor service和线程池构成。executor service可以运行任务,并且这个任务会被分解成较小的任务,它们从线程池中被fork(被不同的线程执行)出来,在join(即它的所有子任务都完成了)之前会一直等待。

一个completion service就是java.util.concurrent.CompletionService< V >接口的实现,用于从已完成任务(消费者)结果的消费中解耦新的异步任务(生产者)的生产。V是这个任务返回结果的类型。

附录A会回答每个章节的练习题。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注