[关闭]
@lambeta 2016-09-20T12:11:18.000000Z 字数 13423 阅读 313

第7章

translation


第7章

锁框架

包java.util.concurrent.locks提供了一个包含多种接口和类的框架针对条件进行加锁和等待。不同于对象的内置加锁同步以及java.lang.Object的等待、通知机制,包含锁框架的并发工具类通过轮训锁、限时等待及其它方式改善了这种机制。

同步及低级别的锁

Java支持同步以便线程能够安全地更新共享变量并且保证一条线程的更新对其它线程可见。你可以给方法或者代码块标记上synchronized关键字到达同步的目的。这样的代码序列被称为临界区。Java虚拟机(JVM)通过JVM指令monitors、monitorenter以及monitorexit来支持同步。

每一个Java对象都和一个监听器相关联,监听器是一个互斥(每次只允许一条线程在临界区中执行)构造,它阻止多条线程同时在临界区中并发执行。在线程可以进入临界区之前,它需要锁住监听器。如果这个监听器已经上锁,这条线程在监听器释放之前会一直阻塞(由于其它线程正在使用临界区)。

当线程在多核、多处理器的环境中锁住一个监听器,存储在主存中的共享变量的值会被读取对应的拷贝中然后存储在线程的工作内存(被称为本地内存或者缓存)。这一动作能够保证该线程使用这些变量最近的值并且不会污染这些值,我们称为可见性。线程会持续使用这些共享变量的拷贝。当离开临界区,线程会释放监听器,这些共享变量的值就被写回主存,以确保下一条线程进入临界区也能访问这些变量最近的值(volatile关键字仅仅解决了可见性。)

锁框架包含了经常使用的锁、重入锁、条件、读写锁以及重入读写锁等类别,这些我都会在本章中进行探讨。同时,我也会简要地介绍Java 8中引入的StampedLock类。

接口Lock提供了比监听器关联的锁更为弹性的锁操作,比如,当锁不可用时,可以立即退出对一个锁的请求。这个接口声明了如下方法:

获取到的锁必须被释放。在同步方法和代码块的上下文中,隐式的监听锁和每个对象关联住。所有的锁获取和释放都发生在块状结构中。当多个锁被获取,它们会以相反的顺序被释放,并且所有的锁都会在它们被获取的文本作用域中被释放掉。

在接口Lock实现的上下文中锁的获取和释放可以更加弹性。举个例子,某些并发地遍历访问数据结构的算法需要使用“两手交替”或者“链状上锁”的方式:你获取节点A的锁,然后节点B,然后释放A的锁并获取C,之后又释放B并获取D的锁等等。接口Lock的实现允许使用在不同的作用域中获取、释放锁这样的技术,同时允许以不同的顺序允许多个锁被获取和释放。

提升伸缩性伴随着额外的代价。块状锁的缺失移除了发生在同步方法和代码块上自动释放锁的功能。结果是,你通常应该遵循下列对于锁获取和释放的约定俗成:

  1. Lock l = ...; // ... is a placeholder for code that obtains the lock
  2. l.lock();
  3. try
  4. {
  5. // access the resource protected by this lock
  6. }
  7. catch (Exception ex)
  8. {
  9. // restore invariants
  10. }
  11. finally {
  12. l.unlock();
  13. }

这一约定俗成能够保证获取到的锁总会被释放。


注意 所有Lock的实现都需要强制和内置的监听锁的内存同步语义一致。


重入锁

类ReentrantLock实现了接口Lock,描述了一个可重入的互斥锁。这个锁和一个持有量相关联。当一条线程持有这个锁并且调用lock()、lockUninterruptibly()或者任意一个tryLock()方法重新获取锁,这个持有量就递增1。当线程调用unlock()方法,持有量就递减1。当持有量降为0,锁就会被释放掉。

ReentrantLock提供了与通过同步方法、代码块得以访问的隐式监听锁同样的并发及内存语义。不过,它具备可扩展的功能并且在高线程争用的环境下(线程频繁地请求获取已经被其它线程持有的锁)具有更好的性能。这样,当很多线程尝试访问共享资源时,JVM会花费更少的时间调度这些线程而把更多的时间用于执行它们。

你可以通过调用下列构造函数之一初始化一个ReentrantLock的实例:

ReentrantLock实现了Lock的方法。不过,它对unlock()方法的实现在调用线程没有持有锁的情况下会抛出java.lang.IllegalMonitorStateException。当然ReentrantLock也提供了自己的方法。举个例子,boolean isFair()方法返回公平策略,boolean isHeldByCurrentThread()方法在锁被当前线程持有的情况下,返回true。清单7-1示范了ReentrantLock。

清单7-1 用重入锁获得同步功能

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class RLDemo
  6. {
  7. public static void main(String[] args)
  8. {
  9. ExecutorService executor = Executors.newFixedThreadPool(2);
  10. final ReentrantLock lock = new ReentrantLock();
  11. class Worker implements Runnable
  12. {
  13. private final String name;
  14. Worker(String name)
  15. {
  16. this.name = name;
  17. }
  18. @Override
  19. public void run()
  20. {
  21. lock.lock();
  22. try
  23. {
  24. if (lock.isHeldByCurrentThread())
  25. System.out.printf("Thread %s entered critical section.%n",name); System.out.printf("Thread %s performing work.%n", name);
  26. try
  27. {
  28. Thread.sleep(2000);
  29. }
  30. catch (InterruptedException ie)
  31. {
  32. ie.printStackTrace();
  33. }
  34. System.out.printf("Thread %s finished working.%n", name);
  35. }
  36. finally
  37. {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42. executor.execute(new Worker("ThdA"));
  43. executor.execute(new Worker("ThdB"));
  44. try
  45. {
  46. executor.awaitTermination(5, TimeUnit.SECONDS);
  47. }
  48. catch (InterruptedException ie)
  49. {
  50. ie.printStackTrace();
  51. }
  52. executor.shutdownNow();
  53. }
  54. }

清单7-1的应用程序中,默认主线程创建了一对工作线程,它们进入临界区、在其中模拟执行任务,然后离开临界区。它们使用了ReentrantLock的lock()和unlock()方法来获取和释放一个重入锁。当一条线程调用lock()方法而锁又不可用时,这个线程就会一直禁用(并且不能被调度)直到锁变为可用。

照下面编译清单7-1:

  1. 
javac RLDemo.java

运行程序:

  1. java RLDemo

你应该能观察到类似下列的输出(消息的顺序可能略有出入):

  1. Thread ThdA entered critical section.
  2. Thread ThdA performing work.
  3. Thread ThdA finished working.
  4. Thread ThdB entered critical section.
  5. Thread ThdB performing work.
  6. Thread ThdB finished working.

条件

接口Condition把Object的wait和notification方法(wait(),notify()和notifyAll())分解到不同的条件对象中。通过把这些条件和任意Lock实现的使用组合起来,起到让每个对象上具有多重等待集合的作用。这里Lock取代了同步方法、代码块,Condition取代了Object的wait、notificaiton方法。


注意 一个Condition的实例原本就会绑定到一个锁上。所以可以使用Lock的newCondition()方法去获取一个针对特定Lock实例的Condition实例。


Condition声明下列方法:

清单7-2 用Locks和Conditions获得同步功能

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class PC
  5. {
  6. public static void main(String[] args)
  7. {
  8. Shared s = new Shared();
  9. new Producer(s).start();
  10. new Consumer(s).start();
  11. }
  12. }
  13. class Shared
  14. {
  15. private char c;
  16. private volatile boolean available;
  17. private final Lock lock;
  18. private final Condition condition;
  19. Shared() {
  20. available = false;
  21. lock = new ReentrantLock();
  22. condition = lock.newCondition();
  23. }
  24. Lock getLock()
  25. {
  26. return lock;
  27. }
  28. char getSharedChar()
  29. {
  30. lock.lock();
  31. try
  32. {
  33. while (!available)
  34. try
  35. {
  36. condition.await();
  37. }
  38. catch (InterruptedException ie)
  39. {
  40. ie.printStackTrace();
  41. }
  42. available = false;
  43. condition.signal();
  44. }
  45. finally {
  46. lock.unlock();
  47. return c;
  48. }
  49. }
  50. void setSharedChar(char c)
  51. {
  52. lock.lock();
  53. try
  54. {
  55. while (available)
  56. try
  57. {
  58. condition.await();
  59. }
  60. catch (InterruptedException ie)
  61. {
  62. ie.printStackTrace();
  63. }
  64. this.c = c;
  65. available = true;
  66. condition.signal();
  67. } finally {
  68. lock.unlock();
  69. }
  70. }
  71. }
  72. class Producer extends Thread
  73. {
  74. private final Lock l;
  75. private final Shared s;
  76. Producer(Shared s)
  77. {
  78. this.s = s;
  79. l = s.getLock();
  80. }
  81. @Override
  82. public void run()
  83. {
  84. for (char ch = 'A'; ch <= 'Z'; ch++)
  85. {
  86. l.lock();
  87. s.setSharedChar(ch);
  88. System.out.println(ch + " produced by producer.");
  89. l.unlock();
  90. }
  91. }
  92. }
  93. class Consumer extends Thread
  94. {
  95. private final Lock l;
  96. private final Shared s;
  97. Consumer(Shared s)
  98. {
  99. this.s = s;
  100. l = s.getLock();
  101. }
  102. @Override
  103. public void run()
  104. {
  105. char ch;
  106. do
  107. {
  108. l.lock();
  109. ch = s.getSharedChar();
  110. System.out.println(ch + " consumed by consumer.");
  111. l.unlock();
  112. }
  113. while (ch != 'Z');
  114. }
  115. }

清单7-2和清单3-2的PC应用程序类似。不过,它用锁和条件替换了同步(synchronized)及等待、通知。

PC的main()方法初始化了类Shared、Producer以及Consumer。Shared实例被传递到Producer和Consumer的构造函数当中,这些线程随后启动。

默认主线程调用Producer和Consumer构造函数。由于Shared实例也被producer和consumer线程所访问,该实例必须对这些线程可见(尤其是当这些线程运行在不同的核上时)。在每个Producer和Consumer中,我通过把s声明成final来达到这个目的。当然也可以将这一属性声明成volatile,但是volatile暗示会对这一属性进行额外的写操作,事实上s在初始化之后就不会再改变了。

查看Shared的构造函数,注意到它通过lock = new ReentrantLock();的方法创建了一个锁,并且借由condition = lock.newCondition();创建了一个与之关联的条件。生产者和消费者可以通过Lock getLock()方法访问这个锁。

当变量available为true时,生产者调用条件的await()方法等待available变为false。消费者在消费字符之后,发出信号给这个条件来唤醒生产者。(因为可能存在假唤醒,而available还是true,所以我使用循环代替了if语句。)

在离开循环之后,生产者线程记录下新的字符,给available赋值为true表明有个新字符可被消费,同时发出信号给这个条件来唤醒一个等待中的消费者。最终,它释放了锁并且退出了setSharedChar()方法。


注意 尽管字符是先生产后消费的,我还是给Producer run()方法中setSharedChar()/System.out.println()块以及Consumer run()方法中的getSharedChar()/System.out.println()块上了锁以免应用程序在输出生产消息之前输出消费消息。


消费者线程的行为和getSharedChar()方法与我们之前对生产者线程的描述及setSharedChar()方法类似。


注意 由于当前上下文不会抛出异常,所以我并没有使用try/finally这样的约定俗成来保证在Producer和Consumer的run()方法里释放了锁。


照下面编译清单7-2:

  1. javac PC.java

运行程序:

  1. java PC

你应该能观测到类似下列输出的开始部分,这暗示了步调一致的同步(生产者线程在有东西被消费之前不会生产,而消费者线程在有东西被生产出来之前不会消费。):

  1. A produced by producer.
  2. A consumed by consumer.
  3. B produced by producer.
  4. B consumed by consumer.
  5. C produced by producer.
  6. C consumed by consumer.
  7. D produced by producer.
  8. D consumed by consumer.

读写锁

读写锁适用于对数据结构频繁读而修改少的场景。举个例子,你可以创建了一个在线单词定义的辞典供多条线程并发读取,然而单条线程可能会不时地添加新的定义或更新已有的定义。锁框架针对这些场景提供了读-写锁机制,在读取时有更好的并发性,而写入时保证安全的互斥访问。这一机制基于接口ReadWriteLock。

ReadWriteLock维护了一对锁:一个锁针对只读操作,一个锁针对写操作。在没有写者的时候,读锁可能会被多条读线程同时持有。写入锁是互斥的:只有单个线程可以修改共享数据。(和synchronized关键字关联的锁也是互斥的。)

ReadWriteLock声明了下列方法:

重入读写锁

类ReentrantReadWriteLock实现了接口ReadWriteLock,代表了一个与ReentrantLock具有相同语义的重入读-写锁。

你可以调用下列构造函数之一来初始化一个ReentrantReadWriteLock实例:


注意
基于公平的顺序策略,若当前持有的锁被释放了,那要么是等待最久的单条写线程会被分配写锁,要么就是当一组读线程比所有等待中的写线程等待时间还长时,这组读线程会被分配读锁。

尝试获取一个公平读锁(非重入的)的线程会因为写锁被持有或者存在一条等待的写线程而被阻塞住。这条线程在等待最久的写线程已经获取并释放写锁之前是没法获取到读锁的。如果等待中的写线程放弃了等待,致使一条或者多条读线程成为无写锁的队列当中最久的等待者,那些读线程就会被分配读锁。

除非读锁和写锁都没有被持有(暗示没有处于等待中的线程),否则尝试获取一个公平写锁(非重入的)的线程将会阻塞。(非阻塞的tryLock()方法并不遵循公平设置,它不管等待中的线程,而是尽可能立即获取锁。)


在初始化这个类之后,你可以调用下列方法来获取读和写锁:

每个内嵌的ReadLock和WriteLock类都实现了接口Lock并且实现了自己的方法。除此之外,ReentrantReadWriteLock声明了诸如下列这对方法的额外方法:

为了演示ReadWriteLock和ReentrantReadWriteLock,清单7-3呈现了一个应用程序,其写线程产生单词定义的条目,而读线程持续随机地访问这些条目并打印出来。

清单7-3 使用ReadWriteLock满足辞典程序的读、写线程

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReadWriteLock;
  7. import java.util.concurrent.locks.ReentrantReadWriteLock;
  8. public class Dictionary
  9. {
  10. public static void main(String[] args)
  11. {
  12. final String[] words =
  13. {
  14. "hypocalcemia",
  15. "prolixity",
  16. "assiduous",
  17. "indefatigable",
  18. "castellan"
  19. };
  20. final String[] definitions =
  21. {
  22. "a deficiency of calcium in the blood",
  23. "unduly prolonged or drawn out",
  24. "showing great care, attention, and effort",
  25. "able to work or continue for a lengthy time without tiring",
  26. "the govenor or warden of a castle or fort"
  27. };
  28. final Map<String, String> dictionary = new HashMap<String, String>();
  29. ReadWriteLock rwl = new ReentrantReadWriteLock(true);
  30. final Lock rlock = rwl.readLock();
  31. final Lock wlock = rwl.writeLock();
  32. Runnable writer = () ->
  33. {
  34. for (int i = 0; i < words.length; i++)
  35. {
  36. wlock.lock();
  37. try
  38. {
  39. dictionary.put(words[i],
  40. definitions[i]);
  41. System.out.println("writer storing " +
  42. words[i] + " entry");
  43. } finally {
  44. wlock.unlock();
  45. }
  46. try
  47. {
  48. Thread.sleep(1);
  49. }
  50. catch (InterruptedException ie)
  51. {
  52. System.err.println("writer " +
  53. "interrupted");
  54. }
  55. }
  56. };
  57. ExecutorService es = Executors.newFixedThreadPool(1);
  58. es.submit(writer);
  59. Runnable reader = () ->
  60. {
  61. while (true)
  62. {
  63. rlock.lock();
  64. try
  65. {
  66. int i = (int) (Math.random() *
  67. words.length);
  68. System.out.println("reader accessing " +
  69. words[i] + ": " +
  70. dictionary.get(words[i])
  71. + " entry");
  72. }
  73. finally
  74. {
  75. rlock.unlock();
  76. }
  77. }
  78. };
  79. es = Executors.newFixedThreadPool(1);
  80. es.submit(reader);
  81. }
  82. }

清单7-3中,默认主线程首先创建单词和定义的字符串数组,由于它们得从匿名类中访问所以被定义成final。创建了一个存储单词及其定义条目的map之后,线程获取一个重入的读写锁并开始访问读写锁。

针对写线程的runnable现在被创建出来,其run()方法遍历了单词的数组,每趟遍历都会锁住写锁。当此方法返回时,写线程就有了互斥的写锁,同时能够通过调用map的put方法来更新map。在输出一个标识添加单词的消息之后,写线程会释放锁,然后睡眠一毫秒给操作其它任务的线程出现的机会。接下来,基于线程池的executor会调用写线程的runnable。

针对读线程的runnable后续会被创建出来。其run()方法会重复地获取读锁、随机访问map中的条目、输出这个条目然后释放读锁。接下来,从线程池中获取的executor会调用读线程的runnable。

虽然我曾经因为不会抛出异常所以没有遵循锁获取和释放的约定,但为了良好的格式,这里我还是指定了try/fanally结构。
照下面编译清单7-3:

  1. javac Dictionary.java

运行程序:

  1. java Dictionary

正如我在执行时观测到的,你应该也能观测到如下输出的开始部分(消息的顺序可能略有出入):

  1. writer storing hypocalcemia entry
  2. writer storing prolixity entry
  3. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  4. writer storing assiduous entry
  5. reader accessing assiduous: showing great care, attention, and effort entry
  6. reader accessing castellan: null entry
  7. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  8. reader accessing assiduous: showing great care, attention, and effort entry
  9. reader accessing indefatigable: null entry
  10. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  11. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  12. reader accessing assiduous: showing great care, attention, and effort entry
  13. reader accessing indefatigable: null entry
  14. reader accessing prolixity: unduly prolonged or drawn out entry
  15. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  16. reader accessing castellan: null entry
  17. reader accessing assiduous: showing great care, attention, and effort entry
  18. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  19. reader accessing prolixity: unduly prolonged or drawn out entry
  20. reader accessing assiduous: showing great care, attention, and effort entry
  21. reader accessing castellan: null entry
  22. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  23. reader accessing indefatigable: null entry
  24. reader accessing castellan: null entry
  25. reader accessing prolixity: unduly prolonged or drawn out entry
  26. reader accessing hypocalcemia: a deficiency of calcium in the blood entry
  27. writer storing indefatigable entry
  28. reader accessing assiduous: showing great care, attention, and effort entry
  29. reader accessing assiduous: showing great care, attention, and effort entry

注意 Java8把StampedLock添加到java.util.concurrent.locks包中。根据其JDK 8的文档,StampedLock是一个具有三种控制读写访问模式的capability-based锁。它用类似于ReentrantReadWriteLock的方式区分了互斥和非互斥的锁,而且还提供了ReentrantReadWriteLock锁不支持的乐观读。可以查看Heinz Kabutz博士关于Phaser and StampedLock并发同步器的视频演示(www.parleys.com/tutorial/5148922b0364bc17fc56ca4f/chapter0/about)来学习StampedLock。当然,也可以看看这个演示的PDF文件(www.jfokus.se/jfokus13/preso/jf13_PhaserAndStampedLock.pdf)。


练习

下面的练习被设计来验证你对第7章内容的理解程度:
1. 定义锁。
2. 当线程进入临界区(通过synchronized保留字控制),Lock对象与内置锁相比,最大的优势是什么?
3. 判断对错:当调用线程没有持有锁,ReentrantLock的unlock()方法会抛出IllegalMonitorStateException。
4. 如何获取一个和特定Lock实例一起使用的Condition实例。
5. 判断对错:ReentrantReadWriteLock()创建了一个用于公平顺序策略的ReentrantReadWriteLock实例。
6. 定义StampedLock。
7. 包java.util.concurrent.locks包含了类LockSupport,这个LockSupport类的意义何在?
8. 用一个等价类替换下面的ID类,这个类使用ReentrantLock替代了synchronized:

  1. public class ID
  2. {
  3. private static int counter; // initialized to 0 by default
  4. public static synchronized int getID()
  5. {
  6. int temp = counter + 1;
  7. try
  8. {
  9. Thread.sleep(1);
  10. }
  11. catch (InterruptedException ie)
  12. {
  13. }
  14. return counter = temp;
  15. } }

小结

包java.util.concurrent.locks提供了一个包含多种接口和类的框架针对条件进行加锁和等待。不同于对象的内置加锁同步以及java.lang.Object的等待、通知机制,包含锁框架的并发工具类通过轮训锁、限时等待及其它方式改善了这种机制。

锁框架包含了经常使用的锁、重入锁、条件、读写锁以及重入读写锁等类别,这些我都已经在本章中探讨过。同时,我也简要地介绍了Java 8中引入的类StampedLock。

第8章会涉及额外的并发工具。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注