@Yano
2018-12-28T09:25:03.000000Z
字数 7496
阅读 2472
Java
@Testpublic void testPriorityQueue() throws InterruptedException {PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));System.out.println(priorityQueue);System.out.println(priorityQueue.poll());System.out.println(priorityQueue.poll());PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();blockingQueue.add(5);System.out.println(blockingQueue.poll());System.out.println(blockingQueue.take());}
输出
[1, 3, 2, 4, 5]125(阻塞)
/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d. The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/transient Object[] queue; // non-private to simplify nested class access/*** The number of elements in the priority queue.*/private int size = 0;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private final Comparator<? super E> comparator;/*** The number of times this priority queue has been* <i>structurally modified</i>. See AbstractList for gory details.*/transient int modCount = 0; // non-private to simplify nested class access
通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。
public E poll() {if (size == 0)return null;int s = --size;modCount++;E result = (E) queue[0];E x = (E) queue[s];queue[s] = null;if (s != 0)siftDown(0, x);return result;}/*** Inserts item x at position k, maintaining heap invariant by* demoting x down the tree repeatedly until it is less than or* equal to its children or is a leaf.** @param k the position to fill* @param x the item to insert*/private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)c = queue[child = right];if (key.compareTo((E) c) <= 0)break;queue[k] = c;k = child;}queue[k] = key;}
以poll方法为例,实际上是获取堆顶元素,然后调整堆。
调整堆的方法(以大顶堆为例):
1. 判断是否传入comparator,有则按照comparator排序,否则按照自然顺序排序
2. 取节点左右孩子节点最大值,与父亲节点交换
/*** Increases the capacity of the array.** @param minCapacity the desired minimum capacity*/private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ?(oldCapacity + 2) :(oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);queue = Arrays.copyOf(queue, newCapacity);}private static int hugeCapacity(int minCapacity) {if (minCapacity < 0) // overflowthrow new OutOfMemoryError();return (minCapacity > MAX_ARRAY_SIZE) ?Integer.MAX_VALUE :MAX_ARRAY_SIZE;}
非线程安全
其实现基本与PriorityQueue一致,不过PriorityBlockingQueue是线程安全的,并且实现了BlockingQueue接口,在队列为空时take会阻塞。
/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d. The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/private transient Object[] queue;/*** The number of elements in the priority queue.*/private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/*** Spinlock for allocation, acquired via CAS.*/private transient volatile int allocationSpinLock;/*** A plain PriorityQueue used only for serialization,* to maintain compatibility with previous versions* of this class. Non-null only during serialization/deserialization.*/private PriorityQueue<E> q;
和PriorityQueue的区别:增加了
1. 重入锁ReentrantLock
2. Condition,用于队列空情况下的阻塞
3. allocationSpinLock,通过CAS手段对queue扩容
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}
可以看到与PriorityQueue的扩容函数很像,不同点:
1. 调用函数时必须持有锁
2. 使用CAS方法进行扩容,在allocationSpinLock为0,并且CAS将其置为1时,线程才能够对数组进行扩容。如果多个线程并发扩容,其余线程会调用Thread.yield()方法。
为什么这样实现PriorityBlockingQueue扩容?
因为PriorityBlockingQueue内部使用的ReentrantLock重入锁,同一个线程多次调用add函数,可能恰好同时调用了tryGrow函数。此时通过重入锁是无法加锁的,仅能通过Synchronized或CAS方式控制并发。
allocationSpinLock是transient的,因为序列化时并不需要此参数;同时又是volatile的,因为可能有多个线程同时调用。
private transient volatile int allocationSpinLock;
// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long allocationSpinLockOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = PriorityBlockingQueue.class;allocationSpinLockOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("allocationSpinLock"));} catch (Exception e) {throw new Error(e);}}
调用方法
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)
allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。
那么使用allocationSpinLockOffset有什么好处呢?它和直接修改allocationSpinLock变量有什么区别?
获取该字段在类中的内存偏移量,直接将内存中的值改为新值。直接修改allocationSpinLock并不是CAS。JDK 1.8代码如下:
public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}
在AtomicInteger类中的调用如下,getAndAddInt方法由具体类的实现方法,抽取到了UNSAFE类中:
public final int getAndDecrement() {return unsafe.getAndAddInt(this, valueOffset, -1);}
/*** Establishes the heap invariant (described above) in the entire tree,* assuming nothing about the order of the elements prior to the call.*/@SuppressWarnings("unchecked")private void heapify() {for (int i = (size >>> 1) - 1; i >= 0; i--)siftDown(i, (E) queue[i]);}private void siftDown(int k, E x) {if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>)x;int half = size >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = queue[child];int right = child + 1;if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)c = queue[child = right];if (key.compareTo((E) c) <= 0)break;queue[k] = c;k = child;}queue[k] = key;}public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)queue[0] = e;elsesiftUp(i, e);return true;}private void siftUp(int k, E x) {if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0)break;queue[k] = e;k = parent;}queue[k] = key;}