[关闭]
@lambeta 2016-08-23T14:09:09.000000Z 字数 6867 阅读 377

第3章

translation


等待和通知

Java提供了一组很小的API支持线程间交互。在使用这组API时,线程持续等待某个条件(继续执行的前置条件)成立,后续会有另一条线程创造出这一条件,进而通知处于等待中的线程。本章我会介绍这组API。

等待、通知API之旅

java.lang.Object类提供了一套等待、通知的API,它由三个wait(),一个notify()和一个notifyAll()方法组成。wait()方法等待某个条件成立,当这个条件成立时,notify()和notifyAll()方法通知处于等待中的线程。

若当前线程开始或正在等待通知时,任意线程中断了它,三个wait()方法都会抛出java.lang.InterruptedException。此时,当前线程的中断状态会被清除。


注意 对象的wait()方法被调用时,线程会放弃对象关联的监听器的所有权。


这组API利用了一个对象的条件队列,该队列就是一种数据结构用于存储那些等待某个条件成立的线程。这些等待的线程被称为等待集合。由于该条件队列和对象的锁紧紧捆绑在一起,所以以上五个方法必须在同步的上下文中(当前的线程必须是该对象监听器的所有者)被调用;否则,会抛出java.lang.IllegalMonitorStateException

下面的代码、伪代码片段演示了这个无参的wait()方法:

  1. synchronized(obj)
  2. {
  3. while (<condition does not hold>)
  4. obj.wait();
  5. // Perform an action that's appropriate to condition.
  6. }

wait()方法在一个同步块中被调用,该同步块同步的对象和调用wait()方法的对象是一样的。由于有假唤醒的可能性(线程不是因为通知,中断或者超时被唤醒),wait()方法在一个while循环中使用,这个循环会测试是否满足条件,并且当条件依然不满足时重新运行wait()方法。while循环退出之后,条件成立了,和条件相宜的动作会被执行。


留心 绝对不要在循环外面调用wait()方法。这个循环会在wait()调用前后进行条件测试。在调用wait()之前测试确保了活性。如果这个测试不存在,但条件满足同时notify()先于wait()方法调用,等待中的线程就不太可能再醒来。而调用wait()之后重新测试条件保证了安全性。如果没有重新测试,并且线程已经从wait()中醒来(或许条件不满足时其它线程意外地调用了notify()方法)同时条件又没有满足,这个线程可能会继续破坏锁所维护的一致性。


下面的代码片段展示了notify()方法,它通知了前面例子中等待的线程:

  1. synchronized(obj)
  2. {
  3. // Set the condition.
  4. obj.notify();
  5. }

注意notify()方法是从临界区里被调用的,这个临界区和wait()方法所在的临界区一样被同一个对象(obj)所保护,并且也使用了同一个对象引用来调用notify()方法。只要遵照这个模式,你就不会陷入麻烦了。


注意 关于notify()notifyAll()哪种通知方法更好存在着大量争论。举个例子,参考“notify()和notifyAll()之间的区别”(http://stackoverflow.com/questions/14924610/difference-between-notify-and-notifyall)。如果你在犹豫用哪个方法,我会说,在仅有两条线程,并且某条线程偶尔等待、需要被另一条线程通知的应用程序才使用notify()方法,否则使用notifyAll()方法。


生产者和消费者

生产者和消费者线程之间的关系是一个涉及条件的线程交互的经典例子。生产者线程产生数据项将来会被消费者消费。每个生产出来的数据项会被存储在一个共享的变量中。

假设线程以不同的速度前行,生产者可能在消费者取得旧数据项处理之前生产了一个新数据项并且将其记录到共享变量当中。当然,消费者也可能在新数据项被生产出来之前就获取共享变量中的内容。

为了克服这些问题,生产者线程必须一直等待直到它被通知之前生产的数据项已经被消费,并且消费者线程也必须一直等待直到自己被通知新数据项已经被生产出来了。清单3-1展示了如何通过wait()notify()方法完成这项任务。

清单3-1. 生产者、消费者关系 第1版

  1. public class PC
  2. {
  3. public static void main(String[] args)
  4. {
  5. Shared s = new Shared();
  6. new Producer(s).start();
  7. new Consumer(s).start();
  8. }
  9. }
  10. class Shared
  11. {
  12. private char c;
  13. private volatile boolean writeable = true;
  14. synchronized void setSharedChar(char c)
  15. {
  16. while (!writeable)
  17. try
  18. {
  19. wait();
  20. }
  21. catch (InterruptedException ie)
  22. {
  23. }
  24. this.c = c;
  25. writeable = false;
  26. notify();
  27. }
  28. synchronized char getSharedChar()
  29. {
  30. while (writeable)
  31. try
  32. {
  33. wait();
  34. }
  35. catch (InterruptedException ie)
  36. {
  37. }
  38. writeable = true;
  39. notify();
  40. return c;
  41. }
  42. }
  43. class Producer extends Thread
  44. {
  45. private final Shared s;
  46. Producer(Shared s)
  47. {
  48. this.s = s;
  49. }
  50. @Override
  51. public void run()
  52. {
  53. for (char ch = 'A'; ch <= 'Z'; ch++)
  54. {
  55. s.setSharedChar(ch);
  56. System.out.println(ch + " produced by producer.");
  57. }
  58. }
  59. }
  60. class Consumer extends Thread
  61. {
  62. private final Shared s;
  63. Consumer(Shared s)
  64. {
  65. this.s = s;
  66. }
  67. @Override
  68. public void run()
  69. {
  70. char ch;
  71. do
  72. {
  73. ch = s.getSharedChar();
  74. System.out.println(ch + " consumed by consumer.");
  75. }
  76. while (ch != 'Z');
  77. }
  78. }

这个应用程序创建了一个共享对象,并且两条线程各自获取了该对象的一份拷贝。生产者调用这个对象的setSharedChar()方法以保存26个大写的英语字母;消费者则调用该对象的getSharedChar()方法来获取每个字母。

writeable实例变量跟踪了两个条件:生产者等待消费者消费一个数据项和消费者等待生产者生产一个新数据项。它帮助协调生产者和消费者的执行。下面以消费者先执行的场景演示这种协调的做法:
1. 消费者执行s.getSharedChar()方法以索取一个字母。
2. 在那个同步方法中,由于writeable变量是true,所以消费者调用了wait()方法。现在,消费者会一直等待直到收到来自于生产者的通知。
3. 生产者最终执行了s.setSharedChar(ch);方法。
4. 当生产者进入同步方法中(这是可能的,因为消费者在进入等待之前于wait()方法中释放掉了锁),它发现writeable的值是true便不会调用wait()方法。
5. 生产者保存下这个字母、把writeable的值设成false(如果消费者到那时还没有消费这个字母,那么这就会导致生产者在下次setSharedChar()调用时陷入等待)并且调用notify()方法去唤醒消费者(假设消费者还在等待)。
6. 生产者退出setSharedChar(char c)方法。
7. 消费者醒来(同时重新获取锁)、把writeable设成true(如果生产者到那时还没能生产出一个字母,那么这就会导致消费者在下次getSharedChar()调用时陷入等待)、通知唤醒生产者线程(假设生产者还在等待),并返回共享的字母。

照下面的编译清单3-1:

javac PC.java

运行结果程序如下:

java PC

你应该能在第一次运行的时候观察到如下的输出:

  1. W produced by producer.
  2. W consumed by consumer.
  3. X produced by producer.
  4. X consumed by consumer.
  5. Y produced by producer.
  6. Y consumed by consumer.
  7. Z produced by producer.
  8. Z consumed by consumer.

尽管这个同步正确地工作了,你可能还观察到在多条消费信息前面存在多条生产消息:

  1. A produced by producer.
  2. B produced by producer.
  3. A consumed by consumer.
  4. B consumed by consumer.

同时,你也可能观察到一条消费消息在一条生产信息前面:

  1. V consumed by consumer.
  2. V produced by producer.

奇怪的输出顺序并不意味着生产者和消费者线程就不是同步的。相反,这是紧跟着调用setSharedChar()之后的伴随方法System.out.println()没被同步以及紧跟着调用getSharedChar()之后的伴随方法System.out.println()没被同步的结果。这个输出的顺序可以通过将每个方法调用对包装到一个同步块的方式修正,这个同步块同步在s所引用的共享对象之上。清单3-2呈现了这一改善。

清单3-2. 生产者-消费者关系 第2版

  1. public class PC
  2. {
  3. public static void main(String[] args)
  4. {
  5. Shared s = new Shared();
  6. new Producer(s).start();
  7. new Consumer(s).start();
  8. }
  9. }
  10. class Shared
  11. {
  12. private char c;
  13. private volatile boolean writeable = true;
  14. synchronized void setSharedChar(char c)
  15. {
  16. while (!writeable)
  17. try
  18. {
  19. wait();
  20. }
  21. catch (InterruptedException ie)
  22. {
  23. }
  24. this.c = c;
  25. writeable = false;
  26. notify();
  27. }
  28. synchronized char getSharedChar()
  29. {
  30. while (writeable)
  31. try
  32. {
  33. wait();
  34. }
  35. catch (InterruptedException ie)
  36. {
  37. }
  38. writeable = true;
  39. notify();
  40. return c;
  41. }
  42. }
  43. class Producer extends Thread
  44. {
  45. private final Shared s;
  46. Producer(Shared s)
  47. {
  48. this.s = s;
  49. }
  50. @Override
  51. public void run()
  52. {
  53. for (char ch = 'A'; ch <= 'Z'; ch++)
  54. {
  55. synchronized(s)
  56. {
  57. s.setSharedChar(ch);
  58. System.out.println(ch + " produced by producer.");
  59. }
  60. }
  61. }
  62. }
  63. class Consumer extends Thread
  64. {
  65. private final Shared s;
  66. Consumer(Shared s)
  67. {
  68. this.s = s;
  69. }
  70. @Override
  71. public void run()
  72. {
  73. char ch;
  74. do
  75. {
  76. synchronized(s)
  77. {
  78. ch = s.getSharedChar();
  79. System.out.println(ch + " consumed by consumer.");
  80. }
  81. }
  82. while (ch != 'Z');
  83. }
  84. }

编译清单3-2(javac PC.java)然后运行程序(java PC)。它的输出应该总是以下面展示的交替顺序出现(为了简洁只显示了前面的几行)。

  1. A produced by producer.
  2. A consumed by consumer.
  3. B produced by producer.
  4. B consumed by consumer.
  5. C produced by producer.
  6. C consumed by consumer.
  7. D produced by producer.
  8. D consumed by consumer.

练习

下面的练习被设计来验证你对第3章内容的理解程度:
1. 定义条件。
2. 描述支持条件的API。
3. 判断对错:wait()方法可以被中断。
4. 你会调用什么方法来唤醒在同一对象监听器上等待的全部线程?
5. 判断对错:一条已经获取到锁的线程在调用Object的wait()方法时并不会释放掉锁。
6. 定义条件队列。
7. 当你在同步上下文之外调用这组API的方法时会发生什么?
8. 定义假唤醒。
9. 为什么你需要在一个循环的上下文中调用wait()方法?
10. 创建一个等待的应用程序,示范一种被称作关卡的高级别的同步构造。这一构造允许多条线程在同一个同步点(关卡)到达,并且一直等待直到关卡被其他的线程解锁,才能继续全部执行。

main()方法首先会为这些线程创建一个runnable,线程会在关卡处等待。runnable打印一条消息表明这条线程正处于等待中,同时递增一个计数器,睡眠2秒,然后等待(确保考虑了假唤醒问题)。唤醒后,线程输出一条消息,表明线程终止。之后main()方法创建三个线程对象并且启动这三条线程去执行runnable。接下来,main()方法创建另一个runnable,它会反复地睡200毫秒直到计数器等于3,这时,它会通知所有处于等待中的线程。最后,main()方法为第二个runnable创建一个线程对象并启动之。

小结

Java提供了一组很小的API支持线程间交互。这组API由三个wait(),一个notify()和一个notifyAll()方法组成。wait()方法等待某个条件成立,当这个条件成立时,notify()和notifyAll()方法通知处于等待中的线程。

wait()notify()notifyAll()方法在一个同步块中被调用,该同步块同步的对象和调用wait()方法的对象是一样的。由于有假唤醒的可能性(线程不是因为通知,中断或者超时被唤醒),wait()方法在一个while循环中使用,这个循环会测试是否满足条件,并且当条件依然不满足时重新运行wait()方法。

产者和消费者线程之间的关系是一个涉及条件的线程交互的经典例子。生产者线程产生数据项将来会被消费者消费。每个生产出来的数据项会被存储在一个共享的变量中。

为了克服那些诸如数据项还没被生产出来就被消费的问题,生产者线程必须一直等待直到它被通知之前生产的数据项已经被消费,并且消费者线程也必须一直等待直到自己被通知新数据项已经被生产出来了。

第4章会涉及到额外的线程能力。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注