[关闭]
@yexiaoqi 2022-04-21T09:09:28.000000Z 字数 1662 阅读 400

用数组实现一个阻塞队列?

刷题


  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. /**
  4. * 要求:用数组实现一个阻塞队列
  5. * 考察:java.util.concurrent.ArrayBlockingQueue的实现原理
  6. * 思路:1.用数组实现队列,考虑使用环形数组,用两个指针putIndex、takeIndex表示下一个要入队、出队的位置,
  7. * 入队/出队到数组末尾时都从零开始,count为元素数量,count=0说明队列为空,count=数组size说明队列已满
  8. * 2.给队列加上阻塞能力和保证线程安全,考虑使用lock+Condition.await
  9. */
  10. public class ArrayBlockingQueue<E> {
  11. final Object[] items;
  12. int putIndex;
  13. int takeIndex;
  14. int count;
  15. ReentrantLock lock;
  16. private final Condition notEmpty;
  17. private final Condition notFull;
  18. public ArrayBlockingQueue(int capacity) {
  19. if (capacity <= 0)
  20. throw new IllegalArgumentException();
  21. this.items = new Object[capacity];
  22. // 创建 lock 对象
  23. lock = new ReentrantLock();
  24. // 队列[非空]Condition
  25. notEmpty = lock.newCondition();
  26. // 队列[不满]Condition
  27. notFull = lock.newCondition();
  28. }
  29. /**
  30. * 为什么while循环需要放在锁内?
  31. * 如果不放在锁内,则可能会出现多个线程同时看到满足条件,进而去加锁入队。虽然入队还是在临界区,
  32. * 但会出现队列已满,仍然在执行入队操作的情况。和单例的双检查锁中少一个检查的问题类似
  33. */
  34. public void put(E e) throws InterruptedException {
  35. final ReentrantLock lock = this.lock;
  36. lock.lockInterruptibly();
  37. try {
  38. while (count == items.length)
  39. //如果队列已满,调用await()方法释放锁并阻塞,被其他线程signal唤醒后会重新抢锁,
  40. //再次获取锁后会继续走到while循环判断条件的地方
  41. notFull.await();
  42. //如果还可以存,则执行入队操作
  43. enqueue(e);
  44. //唤醒一个线程
  45. notEmpty.signal();
  46. } finally {
  47. lock.unlock();
  48. }
  49. }
  50. public E take() throws InterruptedException {
  51. final ReentrantLock lock = this.lock;
  52. lock.lockInterruptibly();
  53. try {
  54. while (count == 0)
  55. //如果队列为空,调用await()方法释放和notEmpty关联的锁并阻塞
  56. notEmpty.await();
  57. E e = dequeue();
  58. notFull.signal();
  59. return e;
  60. } finally {
  61. lock.unlock();
  62. }
  63. }
  64. // 入队
  65. private void enqueue(E e) {
  66. Object[] items = this.items;
  67. items[putIndex] = e;
  68. if (++putIndex == items.length) putIndex = 0;
  69. count++;
  70. }
  71. // 出队
  72. private E dequeue() {
  73. Object[] items = this.items;
  74. E e = (E) items[takeIndex];
  75. items[takeIndex] = null;
  76. if (++takeIndex == items.length) takeIndex = 0;
  77. count--;
  78. return e;
  79. }
  80. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注