@Yano 2018-12-28T17:25:03.000000Z 字数 7496 阅读 2039

# Java 优先队列 PriorityQueue PriorityBlockingQueue 源码分析

Java

# 基本使用

@Testpublic void testPriorityQueue() throws InterruptedException {    PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));    System.out.println(priorityQueue);    System.out.println(priorityQueue.poll());    System.out.println(priorityQueue.poll());    PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();    blockingQueue.add(5);    System.out.println(blockingQueue.poll());    System.out.println(blockingQueue.take());}

[1, 3, 2, 4, 5]125(阻塞)

# PriorityQueue

## 成员变量

/** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d.  The element with the * lowest value is in queue[0], assuming the queue is nonempty. */transient Object[] queue; // non-private to simplify nested class access/** * The number of elements in the priority queue. */private int size = 0;/** * The comparator, or null if priority queue uses elements' * natural ordering. */private final Comparator<? super E> comparator;/** * The number of times this priority queue has been * <i>structurally modified</i>.  See AbstractList for gory details. */transient int modCount = 0; // non-private to simplify nested class access

## 基本方法

public E poll() {    if (size == 0)        return null;    int s = --size;    modCount++;    E result = (E) queue[0];    E x = (E) queue[s];    queue[s] = null;    if (s != 0)        siftDown(0, x);    return result;}/** * Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf. * * @param k the position to fill * @param x the item to insert */private void siftDown(int k, E x) {    if (comparator != null)        siftDownUsingComparator(k, x);    else        siftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {    Comparable<? super E> key = (Comparable<? super E>)x;    int half = size >>> 1;        // loop while a non-leaf    while (k < half) {        int child = (k << 1) + 1; // assume left child is least        Object c = queue[child];        int right = child + 1;        if (right < size &&            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)            c = queue[child = right];        if (key.compareTo((E) c) <= 0)            break;        queue[k] = c;        k = child;    }    queue[k] = key;}

1. 判断是否传入comparator，有则按照comparator排序，否则按照自然顺序排序
2. 取节点左右孩子节点最大值，与父亲节点交换

## 扩容方法

/** * Increases the capacity of the array. * * @param minCapacity the desired minimum capacity */private void grow(int minCapacity) {    int oldCapacity = queue.length;    // Double size if small; else grow by 50%    int newCapacity = oldCapacity + ((oldCapacity < 64) ?                                     (oldCapacity + 2) :                                     (oldCapacity >> 1));    // overflow-conscious code    if (newCapacity - MAX_ARRAY_SIZE > 0)        newCapacity = hugeCapacity(minCapacity);    queue = Arrays.copyOf(queue, newCapacity);}private static int hugeCapacity(int minCapacity) {    if (minCapacity < 0) // overflow        throw new OutOfMemoryError();    return (minCapacity > MAX_ARRAY_SIZE) ?        Integer.MAX_VALUE :        MAX_ARRAY_SIZE;}
1. 小容量扩容1倍
2. 大容量扩容0.5倍
3. 快溢出时调整为Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE

# PriorityBlockingQueue

/** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d.  The element with the * lowest value is in queue[0], assuming the queue is nonempty. */private transient Object[] queue;/** * The number of elements in the priority queue. */private transient int size;/** * The comparator, or null if priority queue uses elements' * natural ordering. */private transient Comparator<? super E> comparator;/** * Lock used for all public operations */private final ReentrantLock lock;/** * Condition for blocking when empty */private final Condition notEmpty;/** * Spinlock for allocation, acquired via CAS. */private transient volatile int allocationSpinLock;/** * A plain PriorityQueue used only for serialization, * to maintain compatibility with previous versions * of this class. Non-null only during serialization/deserialization. */private PriorityQueue<E> q;

1. 重入锁ReentrantLock
2. Condition，用于队列空情况下的阻塞
3. allocationSpinLock，通过CAS手段对queue扩容

private void tryGrow(Object[] array, int oldCap) {    lock.unlock(); // must release and then re-acquire main lock    Object[] newArray = null;    if (allocationSpinLock == 0 &&        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,                                 0, 1)) {        try {            int newCap = oldCap + ((oldCap < 64) ?                                   (oldCap + 2) : // grow faster if small                                   (oldCap >> 1));            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow                int minCap = oldCap + 1;                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)                    throw new OutOfMemoryError();                newCap = MAX_ARRAY_SIZE;            }            if (newCap > oldCap && queue == array)                newArray = new Object[newCap];        } finally {            allocationSpinLock = 0;        }    }    if (newArray == null) // back off if another thread is allocating        Thread.yield();    lock.lock();    if (newArray != null && queue == array) {        queue = newArray;        System.arraycopy(array, 0, newArray, 0, oldCap);    }}

1. 调用函数时必须持有锁

allocationSpinLock是transient的，因为序列化时并不需要此参数；同时又是volatile的，因为可能有多个线程同时调用。

private transient volatile int allocationSpinLock;

## UNSAFE.compareAndSwapInt

// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long allocationSpinLockOffset;static {    try {        UNSAFE = sun.misc.Unsafe.getUnsafe();        Class<?> k = PriorityBlockingQueue.class;        allocationSpinLockOffset = UNSAFE.objectFieldOffset            (k.getDeclaredField("allocationSpinLock"));    } catch (Exception e) {        throw new Error(e);    }}

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)

allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。

public final int getAndAddInt(Object var1, long var2, int var4) {    int var5;    do {        var5 = this.getIntVolatile(var1, var2);    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));    return var5;}

public final int getAndDecrement() {    return unsafe.getAndAddInt(this, valueOffset, -1);}

# 对比 PriorityQueue 和 PriorityBlockingQueue

1. PriorityQueue是非线程安全的，PriorityBlockingQueue是线程安全的
2. PriorityBlockingQueue使用重入锁，每一个操作都需要加锁
3. PriorityBlockingQueue扩容时使用了CAS操作
4. 两者都使用了堆，算法原理相同
5. PriorityBlockingQueue可以在queue为空时阻塞take操作

# JDK实现堆的方法

/** * Establishes the heap invariant (described above) in the entire tree, * assuming nothing about the order of the elements prior to the call. */@SuppressWarnings("unchecked")private void heapify() {    for (int i = (size >>> 1) - 1; i >= 0; i--)        siftDown(i, (E) queue[i]);}private void siftDown(int k, E x) {    if (comparator != null)        siftDownUsingComparator(k, x);    else        siftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {    Comparable<? super E> key = (Comparable<? super E>)x;    int half = size >>> 1;        // loop while a non-leaf    while (k < half) {        int child = (k << 1) + 1; // assume left child is least        Object c = queue[child];        int right = child + 1;        if (right < size &&            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)            c = queue[child = right];        if (key.compareTo((E) c) <= 0)            break;        queue[k] = c;        k = child;    }    queue[k] = key;}public boolean offer(E e) {    if (e == null)        throw new NullPointerException();    modCount++;    int i = size;    if (i >= queue.length)        grow(i + 1);    size = i + 1;    if (i == 0)        queue[0] = e;    else        siftUp(i, e);    return true;}private void siftUp(int k, E x) {    if (comparator != null)        siftUpUsingComparator(k, x);    else        siftUpComparable(k, x);}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {    Comparable<? super E> key = (Comparable<? super E>) x;    while (k > 0) {        int parent = (k - 1) >>> 1;        Object e = queue[parent];        if (key.compareTo((E) e) >= 0)            break;        queue[k] = e;        k = parent;    }    queue[k] = key;}

• 私有
• 公开
• 删除