[关闭]
@Yano 2018-12-28T09:25:03.000000Z 字数 7496 阅读 1958

Java 优先队列 PriorityQueue PriorityBlockingQueue 源码分析

Java


基本使用

  1. @Test
  2. public void testPriorityQueue() throws InterruptedException {
  3. PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
  4. System.out.println(priorityQueue);
  5. System.out.println(priorityQueue.poll());
  6. System.out.println(priorityQueue.poll());
  7. PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
  8. blockingQueue.add(5);
  9. System.out.println(blockingQueue.poll());
  10. System.out.println(blockingQueue.take());
  11. }

输出

  1. [1, 3, 2, 4, 5]
  2. 1
  3. 2
  4. 5
  5. (阻塞)

PriorityQueue

成员变量

  1. /**
  2. * Priority queue represented as a balanced binary heap: the two
  3. * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
  4. * priority queue is ordered by comparator, or by the elements'
  5. * natural ordering, if comparator is null: For each node n in the
  6. * heap and each descendant d of n, n <= d. The element with the
  7. * lowest value is in queue[0], assuming the queue is nonempty.
  8. */
  9. transient Object[] queue; // non-private to simplify nested class access
  10. /**
  11. * The number of elements in the priority queue.
  12. */
  13. private int size = 0;
  14. /**
  15. * The comparator, or null if priority queue uses elements'
  16. * natural ordering.
  17. */
  18. private final Comparator<? super E> comparator;
  19. /**
  20. * The number of times this priority queue has been
  21. * <i>structurally modified</i>. See AbstractList for gory details.
  22. */
  23. transient int modCount = 0; // non-private to simplify nested class access

通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。

基本方法

  1. public E poll() {
  2. if (size == 0)
  3. return null;
  4. int s = --size;
  5. modCount++;
  6. E result = (E) queue[0];
  7. E x = (E) queue[s];
  8. queue[s] = null;
  9. if (s != 0)
  10. siftDown(0, x);
  11. return result;
  12. }
  13. /**
  14. * Inserts item x at position k, maintaining heap invariant by
  15. * demoting x down the tree repeatedly until it is less than or
  16. * equal to its children or is a leaf.
  17. *
  18. * @param k the position to fill
  19. * @param x the item to insert
  20. */
  21. private void siftDown(int k, E x) {
  22. if (comparator != null)
  23. siftDownUsingComparator(k, x);
  24. else
  25. siftDownComparable(k, x);
  26. }
  27. @SuppressWarnings("unchecked")
  28. private void siftDownComparable(int k, E x) {
  29. Comparable<? super E> key = (Comparable<? super E>)x;
  30. int half = size >>> 1; // loop while a non-leaf
  31. while (k < half) {
  32. int child = (k << 1) + 1; // assume left child is least
  33. Object c = queue[child];
  34. int right = child + 1;
  35. if (right < size &&
  36. ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
  37. c = queue[child = right];
  38. if (key.compareTo((E) c) <= 0)
  39. break;
  40. queue[k] = c;
  41. k = child;
  42. }
  43. queue[k] = key;
  44. }

以poll方法为例,实际上是获取堆顶元素,然后调整堆。

调整堆的方法(以大顶堆为例):
1. 判断是否传入comparator,有则按照comparator排序,否则按照自然顺序排序
2. 取节点左右孩子节点最大值,与父亲节点交换

扩容方法

  1. /**
  2. * Increases the capacity of the array.
  3. *
  4. * @param minCapacity the desired minimum capacity
  5. */
  6. private void grow(int minCapacity) {
  7. int oldCapacity = queue.length;
  8. // Double size if small; else grow by 50%
  9. int newCapacity = oldCapacity + ((oldCapacity < 64) ?
  10. (oldCapacity + 2) :
  11. (oldCapacity >> 1));
  12. // overflow-conscious code
  13. if (newCapacity - MAX_ARRAY_SIZE > 0)
  14. newCapacity = hugeCapacity(minCapacity);
  15. queue = Arrays.copyOf(queue, newCapacity);
  16. }
  17. private static int hugeCapacity(int minCapacity) {
  18. if (minCapacity < 0) // overflow
  19. throw new OutOfMemoryError();
  20. return (minCapacity > MAX_ARRAY_SIZE) ?
  21. Integer.MAX_VALUE :
  22. MAX_ARRAY_SIZE;
  23. }
  1. 小容量扩容1倍
  2. 大容量扩容0.5倍
  3. 快溢出时调整为Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE

是否线程安全

非线程安全

PriorityBlockingQueue

其实现基本与PriorityQueue一致,不过PriorityBlockingQueue是线程安全的,并且实现了BlockingQueue接口,在队列为空时take会阻塞。

  1. /**
  2. * Priority queue represented as a balanced binary heap: the two
  3. * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
  4. * priority queue is ordered by comparator, or by the elements'
  5. * natural ordering, if comparator is null: For each node n in the
  6. * heap and each descendant d of n, n <= d. The element with the
  7. * lowest value is in queue[0], assuming the queue is nonempty.
  8. */
  9. private transient Object[] queue;
  10. /**
  11. * The number of elements in the priority queue.
  12. */
  13. private transient int size;
  14. /**
  15. * The comparator, or null if priority queue uses elements'
  16. * natural ordering.
  17. */
  18. private transient Comparator<? super E> comparator;
  19. /**
  20. * Lock used for all public operations
  21. */
  22. private final ReentrantLock lock;
  23. /**
  24. * Condition for blocking when empty
  25. */
  26. private final Condition notEmpty;
  27. /**
  28. * Spinlock for allocation, acquired via CAS.
  29. */
  30. private transient volatile int allocationSpinLock;
  31. /**
  32. * A plain PriorityQueue used only for serialization,
  33. * to maintain compatibility with previous versions
  34. * of this class. Non-null only during serialization/deserialization.
  35. */
  36. private PriorityQueue<E> q;

和PriorityQueue的区别:增加了
1. 重入锁ReentrantLock
2. Condition,用于队列空情况下的阻塞
3. allocationSpinLock,通过CAS手段对queue扩容

  1. private void tryGrow(Object[] array, int oldCap) {
  2. lock.unlock(); // must release and then re-acquire main lock
  3. Object[] newArray = null;
  4. if (allocationSpinLock == 0 &&
  5. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  6. 0, 1)) {
  7. try {
  8. int newCap = oldCap + ((oldCap < 64) ?
  9. (oldCap + 2) : // grow faster if small
  10. (oldCap >> 1));
  11. if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
  12. int minCap = oldCap + 1;
  13. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
  14. throw new OutOfMemoryError();
  15. newCap = MAX_ARRAY_SIZE;
  16. }
  17. if (newCap > oldCap && queue == array)
  18. newArray = new Object[newCap];
  19. } finally {
  20. allocationSpinLock = 0;
  21. }
  22. }
  23. if (newArray == null) // back off if another thread is allocating
  24. Thread.yield();
  25. lock.lock();
  26. if (newArray != null && queue == array) {
  27. queue = newArray;
  28. System.arraycopy(array, 0, newArray, 0, oldCap);
  29. }
  30. }

可以看到与PriorityQueue的扩容函数很像,不同点:
1. 调用函数时必须持有锁
2. 使用CAS方法进行扩容,在allocationSpinLock为0,并且CAS将其置为1时,线程才能够对数组进行扩容。如果多个线程并发扩容,其余线程会调用Thread.yield()方法。

为什么这样实现PriorityBlockingQueue扩容?

因为PriorityBlockingQueue内部使用的ReentrantLock重入锁,同一个线程多次调用add函数,可能恰好同时调用了tryGrow函数。此时通过重入锁是无法加锁的,仅能通过Synchronized或CAS方式控制并发。

allocationSpinLock是transient的,因为序列化时并不需要此参数;同时又是volatile的,因为可能有多个线程同时调用。

  1. private transient volatile int allocationSpinLock;

UNSAFE.compareAndSwapInt

  1. // Unsafe mechanics
  2. private static final sun.misc.Unsafe UNSAFE;
  3. private static final long allocationSpinLockOffset;
  4. static {
  5. try {
  6. UNSAFE = sun.misc.Unsafe.getUnsafe();
  7. Class<?> k = PriorityBlockingQueue.class;
  8. allocationSpinLockOffset = UNSAFE.objectFieldOffset
  9. (k.getDeclaredField("allocationSpinLock"));
  10. } catch (Exception e) {
  11. throw new Error(e);
  12. }
  13. }

调用方法

  1. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)

allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。

那么使用allocationSpinLockOffset有什么好处呢?它和直接修改allocationSpinLock变量有什么区别?

获取该字段在类中的内存偏移量,直接将内存中的值改为新值。直接修改allocationSpinLock并不是CAS。JDK 1.8代码如下:

  1. public final int getAndAddInt(Object var1, long var2, int var4) {
  2. int var5;
  3. do {
  4. var5 = this.getIntVolatile(var1, var2);
  5. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  6. return var5;
  7. }

在AtomicInteger类中的调用如下,getAndAddInt方法由具体类的实现方法,抽取到了UNSAFE类中:

  1. public final int getAndDecrement() {
  2. return unsafe.getAndAddInt(this, valueOffset, -1);
  3. }

对比 PriorityQueue 和 PriorityBlockingQueue

  1. PriorityQueue是非线程安全的,PriorityBlockingQueue是线程安全的
  2. PriorityBlockingQueue使用重入锁,每一个操作都需要加锁
  3. PriorityBlockingQueue扩容时使用了CAS操作
  4. 两者都使用了堆,算法原理相同
  5. PriorityBlockingQueue可以在queue为空时阻塞take操作

JDK实现堆的方法

  1. /**
  2. * Establishes the heap invariant (described above) in the entire tree,
  3. * assuming nothing about the order of the elements prior to the call.
  4. */
  5. @SuppressWarnings("unchecked")
  6. private void heapify() {
  7. for (int i = (size >>> 1) - 1; i >= 0; i--)
  8. siftDown(i, (E) queue[i]);
  9. }
  10. private void siftDown(int k, E x) {
  11. if (comparator != null)
  12. siftDownUsingComparator(k, x);
  13. else
  14. siftDownComparable(k, x);
  15. }
  16. @SuppressWarnings("unchecked")
  17. private void siftDownComparable(int k, E x) {
  18. Comparable<? super E> key = (Comparable<? super E>)x;
  19. int half = size >>> 1; // loop while a non-leaf
  20. while (k < half) {
  21. int child = (k << 1) + 1; // assume left child is least
  22. Object c = queue[child];
  23. int right = child + 1;
  24. if (right < size &&
  25. ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
  26. c = queue[child = right];
  27. if (key.compareTo((E) c) <= 0)
  28. break;
  29. queue[k] = c;
  30. k = child;
  31. }
  32. queue[k] = key;
  33. }
  34. public boolean offer(E e) {
  35. if (e == null)
  36. throw new NullPointerException();
  37. modCount++;
  38. int i = size;
  39. if (i >= queue.length)
  40. grow(i + 1);
  41. size = i + 1;
  42. if (i == 0)
  43. queue[0] = e;
  44. else
  45. siftUp(i, e);
  46. return true;
  47. }
  48. private void siftUp(int k, E x) {
  49. if (comparator != null)
  50. siftUpUsingComparator(k, x);
  51. else
  52. siftUpComparable(k, x);
  53. }
  54. @SuppressWarnings("unchecked")
  55. private void siftUpComparable(int k, E x) {
  56. Comparable<? super E> key = (Comparable<? super E>) x;
  57. while (k > 0) {
  58. int parent = (k - 1) >>> 1;
  59. Object e = queue[parent];
  60. if (key.compareTo((E) e) >= 0)
  61. break;
  62. queue[k] = e;
  63. k = parent;
  64. }
  65. queue[k] = key;
  66. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注