2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
36 package java.util.concurrent;
38 import java.util.concurrent.locks.*;
42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
43 * the same ordering rules as class {@link PriorityQueue} and supplies
44 * blocking retrieval operations. While this queue is logically
45 * unbounded, attempted additions may fail due to resource exhaustion
46 * (causing {@code OutOfMemoryError}). This class does not permit
47 * {@code null} elements. A priority queue relying on {@linkplain
48 * Comparable natural ordering} also does not permit insertion of
49 * non-comparable objects (doing so results in
50 * {@code ClassCastException}).
52 * <p>This class and its iterator implement all of the
53 * <em>optional</em> methods of the {@link Collection} and {@link
54 * Iterator} interfaces. The Iterator provided in method {@link
55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
56 * the PriorityBlockingQueue in any particular order. If you need
57 * ordered traversal, consider using
58 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
59 * can be used to <em>remove</em> some or all elements in priority
60 * order and place them in another collection.
62 * <p>Operations on this class make no guarantees about the ordering
63 * of elements with equal priority. If you need to enforce an
64 * ordering, you can define custom classes or comparators that use a
65 * secondary key to break ties in primary priority values. For
66 * example, here is a class that applies first-in-first-out
67 * tie-breaking to comparable elements. To use it, you would insert a
68 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
71 * class FIFOEntry<E extends Comparable<? super E>>
72 * implements Comparable<FIFOEntry<E>> {
73 * static final AtomicLong seq = new AtomicLong(0);
76 * public FIFOEntry(E entry) {
77 * seqNum = seq.getAndIncrement();
80 * public E getEntry() { return entry; }
81 * public int compareTo(FIFOEntry<E> other) {
82 * int res = entry.compareTo(other.entry);
83 * if (res == 0 && other.entry != this.entry)
84 * res = (seqNum < other.seqNum ? -1 : 1);
89 * <p>This class is a member of the
90 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
91 * Java Collections Framework</a>.
95 * @param <E> the type of elements held in this collection
97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
98 implements BlockingQueue<E>, java.io.Serializable {
99 private static final long serialVersionUID = 5595510919245408276L;
102 * The implementation uses an array-based binary heap, with public
103 * operations protected with a single lock. However, allocation
104 * during resizing uses a simple spinlock (used only while not
105 * holding main lock) in order to allow takes to operate
106 * concurrently with allocation. This avoids repeated
107 * postponement of waiting consumers and consequent element
108 * build-up. The need to back away from lock during allocation
109 * makes it impossible to simply wrap delegated
110 * java.util.PriorityQueue operations within a lock, as was done
111 * in a previous version of this class. To maintain
112 * interoperability, a plain PriorityQueue is still used during
113 * serialization, which maintains compatibility at the espense of
114 * transiently doubling overhead.
118 * Default array capacity.
120 private static final int DEFAULT_INITIAL_CAPACITY = 11;
123 * The maximum size of array to allocate.
124 * Some VMs reserve some header words in an array.
125 * Attempts to allocate larger arrays may result in
126 * OutOfMemoryError: Requested array size exceeds VM limit
128 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
131 * Priority queue represented as a balanced binary heap: the two
132 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
133 * priority queue is ordered by comparator, or by the elements'
134 * natural ordering, if comparator is null: For each node n in the
135 * heap and each descendant d of n, n <= d. The element with the
136 * lowest value is in queue[0], assuming the queue is nonempty.
138 private transient Object[] queue;
141 * The number of elements in the priority queue.
143 private transient int size;
146 * The comparator, or null if priority queue uses elements'
149 private transient Comparator<? super E> comparator;
152 * Lock used for all public operations
154 private final ReentrantLock lock;
157 * Condition for blocking when empty
159 private final Condition notEmpty;
162 * Spinlock for allocation, acquired via CAS.
164 private transient volatile int allocationSpinLock;
167 * A plain PriorityQueue used only for serialization,
168 * to maintain compatibility with previous versions
169 * of this class. Non-null only during serialization/deserialization.
171 private PriorityQueue q;
174 * Creates a {@code PriorityBlockingQueue} with the default
175 * initial capacity (11) that orders its elements according to
176 * their {@linkplain Comparable natural ordering}.
178 public PriorityBlockingQueue() {
179 this(DEFAULT_INITIAL_CAPACITY, null);
183 * Creates a {@code PriorityBlockingQueue} with the specified
184 * initial capacity that orders its elements according to their
185 * {@linkplain Comparable natural ordering}.
187 * @param initialCapacity the initial capacity for this priority queue
188 * @throws IllegalArgumentException if {@code initialCapacity} is less
191 public PriorityBlockingQueue(int initialCapacity) {
192 this(initialCapacity, null);
196 * Creates a {@code PriorityBlockingQueue} with the specified initial
197 * capacity that orders its elements according to the specified
200 * @param initialCapacity the initial capacity for this priority queue
201 * @param comparator the comparator that will be used to order this
202 * priority queue. If {@code null}, the {@linkplain Comparable
203 * natural ordering} of the elements will be used.
204 * @throws IllegalArgumentException if {@code initialCapacity} is less
207 public PriorityBlockingQueue(int initialCapacity,
208 Comparator<? super E> comparator) {
209 if (initialCapacity < 1)
210 throw new IllegalArgumentException();
211 this.lock = new ReentrantLock();
212 this.notEmpty = lock.newCondition();
213 this.comparator = comparator;
214 this.queue = new Object[initialCapacity];
218 * Creates a {@code PriorityBlockingQueue} containing the elements
219 * in the specified collection. If the specified collection is a
220 * {@link SortedSet} or a {@link PriorityQueue}, this
221 * priority queue will be ordered according to the same ordering.
222 * Otherwise, this priority queue will be ordered according to the
223 * {@linkplain Comparable natural ordering} of its elements.
225 * @param c the collection whose elements are to be placed
226 * into this priority queue
227 * @throws ClassCastException if elements of the specified collection
228 * cannot be compared to one another according to the priority
230 * @throws NullPointerException if the specified collection or any
231 * of its elements are null
233 public PriorityBlockingQueue(Collection<? extends E> c) {
234 this.lock = new ReentrantLock();
235 this.notEmpty = lock.newCondition();
236 boolean heapify = true; // true if not known to be in heap order
237 boolean screen = true; // true if must screen for nulls
238 if (c instanceof SortedSet<?>) {
239 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
240 this.comparator = (Comparator<? super E>) ss.comparator();
243 else if (c instanceof PriorityBlockingQueue<?>) {
244 PriorityBlockingQueue<? extends E> pq =
245 (PriorityBlockingQueue<? extends E>) c;
246 this.comparator = (Comparator<? super E>) pq.comparator();
248 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
251 Object[] a = c.toArray();
253 // If c.toArray incorrectly doesn't return Object[], copy it.
254 if (a.getClass() != Object[].class)
255 a = Arrays.copyOf(a, n, Object[].class);
256 if (screen && (n == 1 || this.comparator != null)) {
257 for (int i = 0; i < n; ++i)
259 throw new NullPointerException();
268 * Tries to grow array to accommodate at least one more element
269 * (but normally expand by about 50%), giving up (allowing retry)
270 * on contention (which we expect to be rare). Call only while
273 * @param array the heap array
274 * @param oldCap the length of the array
276 private void tryGrow(Object[] array, int oldCap) {
277 lock.unlock(); // must release and then re-acquire main lock
278 Object[] newArray = null;
279 if (allocationSpinLock == 0 &&
280 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
283 int newCap = oldCap + ((oldCap < 64) ?
284 (oldCap + 2) : // grow faster if small
286 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
287 int minCap = oldCap + 1;
288 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
289 throw new OutOfMemoryError();
290 newCap = MAX_ARRAY_SIZE;
292 if (newCap > oldCap && queue == array)
293 newArray = new Object[newCap];
295 allocationSpinLock = 0;
298 if (newArray == null) // back off if another thread is allocating
301 if (newArray != null && queue == array) {
303 System.arraycopy(array, 0, newArray, 0, oldCap);
308 * Mechanics for poll(). Call only while holding lock.
310 private E extract() {
316 Object[] array = queue;
317 result = (E) array[0];
320 Comparator<? super E> cmp = comparator;
322 siftDownComparable(0, x, array, n);
324 siftDownUsingComparator(0, x, array, n, cmp);
331 * Inserts item x at position k, maintaining heap invariant by
332 * promoting x up the tree until it is greater than or equal to
333 * its parent, or is the root.
335 * To simplify and speed up coercions and comparisons. the
336 * Comparable and Comparator versions are separated into different
337 * methods that are otherwise identical. (Similarly for siftDown.)
338 * These methods are static, with heap state as arguments, to
339 * simplify use in light of possible comparator exceptions.
341 * @param k the position to fill
342 * @param x the item to insert
343 * @param array the heap array
346 private static <T> void siftUpComparable(int k, T x, Object[] array) {
347 Comparable<? super T> key = (Comparable<? super T>) x;
349 int parent = (k - 1) >>> 1;
350 Object e = array[parent];
351 if (key.compareTo((T) e) >= 0)
359 private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
360 Comparator<? super T> cmp) {
362 int parent = (k - 1) >>> 1;
363 Object e = array[parent];
364 if (cmp.compare(x, (T) e) >= 0)
373 * Inserts item x at position k, maintaining heap invariant by
374 * demoting x down the tree repeatedly until it is less than or
375 * equal to its children or is a leaf.
377 * @param k the position to fill
378 * @param x the item to insert
379 * @param array the heap array
382 private static <T> void siftDownComparable(int k, T x, Object[] array,
384 Comparable<? super T> key = (Comparable<? super T>)x;
385 int half = n >>> 1; // loop while a non-leaf
387 int child = (k << 1) + 1; // assume left child is least
388 Object c = array[child];
389 int right = child + 1;
391 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
392 c = array[child = right];
393 if (key.compareTo((T) c) <= 0)
401 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
403 Comparator<? super T> cmp) {
406 int child = (k << 1) + 1;
407 Object c = array[child];
408 int right = child + 1;
409 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
410 c = array[child = right];
411 if (cmp.compare(x, (T) c) <= 0)
420 * Establishes the heap invariant (described above) in the entire tree,
421 * assuming nothing about the order of the elements prior to the call.
423 private void heapify() {
424 Object[] array = queue;
426 int half = (n >>> 1) - 1;
427 Comparator<? super E> cmp = comparator;
429 for (int i = half; i >= 0; i--)
430 siftDownComparable(i, (E) array[i], array, n);
433 for (int i = half; i >= 0; i--)
434 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
439 * Inserts the specified element into this priority queue.
441 * @param e the element to add
442 * @return {@code true} (as specified by {@link Collection#add})
443 * @throws ClassCastException if the specified element cannot be compared
444 * with elements currently in the priority queue according to the
445 * priority queue's ordering
446 * @throws NullPointerException if the specified element is null
448 public boolean add(E e) {
453 * Inserts the specified element into this priority queue.
454 * As the queue is unbounded, this method will never return {@code false}.
456 * @param e the element to add
457 * @return {@code true} (as specified by {@link Queue#offer})
458 * @throws ClassCastException if the specified element cannot be compared
459 * with elements currently in the priority queue according to the
460 * priority queue's ordering
461 * @throws NullPointerException if the specified element is null
463 public boolean offer(E e) {
465 throw new NullPointerException();
466 final ReentrantLock lock = this.lock;
470 while ((n = size) >= (cap = (array = queue).length))
473 Comparator<? super E> cmp = comparator;
475 siftUpComparable(n, e, array);
477 siftUpUsingComparator(n, e, array, cmp);
487 * Inserts the specified element into this priority queue.
488 * As the queue is unbounded, this method will never block.
490 * @param e the element to add
491 * @throws ClassCastException if the specified element cannot be compared
492 * with elements currently in the priority queue according to the
493 * priority queue's ordering
494 * @throws NullPointerException if the specified element is null
496 public void put(E e) {
497 offer(e); // never need to block
501 * Inserts the specified element into this priority queue.
502 * As the queue is unbounded, this method will never block or
503 * return {@code false}.
505 * @param e the element to add
506 * @param timeout This parameter is ignored as the method never blocks
507 * @param unit This parameter is ignored as the method never blocks
508 * @return {@code true} (as specified by
509 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
510 * @throws ClassCastException if the specified element cannot be compared
511 * with elements currently in the priority queue according to the
512 * priority queue's ordering
513 * @throws NullPointerException if the specified element is null
515 public boolean offer(E e, long timeout, TimeUnit unit) {
516 return offer(e); // never need to block
520 final ReentrantLock lock = this.lock;
531 public E take() throws InterruptedException {
532 final ReentrantLock lock = this.lock;
533 lock.lockInterruptibly();
536 while ( (result = extract()) == null)
544 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
545 long nanos = unit.toNanos(timeout);
546 final ReentrantLock lock = this.lock;
547 lock.lockInterruptibly();
550 while ( (result = extract()) == null && nanos > 0)
551 nanos = notEmpty.awaitNanos(nanos);
559 final ReentrantLock lock = this.lock;
563 result = size > 0 ? (E) queue[0] : null;
571 * Returns the comparator used to order the elements in this queue,
572 * or {@code null} if this queue uses the {@linkplain Comparable
573 * natural ordering} of its elements.
575 * @return the comparator used to order the elements in this queue,
576 * or {@code null} if this queue uses the natural
577 * ordering of its elements
579 public Comparator<? super E> comparator() {
584 final ReentrantLock lock = this.lock;
594 * Always returns {@code Integer.MAX_VALUE} because
595 * a {@code PriorityBlockingQueue} is not capacity constrained.
596 * @return {@code Integer.MAX_VALUE} always
598 public int remainingCapacity() {
599 return Integer.MAX_VALUE;
602 private int indexOf(Object o) {
604 Object[] array = queue;
606 for (int i = 0; i < n; i++)
607 if (o.equals(array[i]))
614 * Removes the ith element from queue.
616 private void removeAt(int i) {
617 Object[] array = queue;
619 if (n == i) // removed last element
622 E moved = (E) array[n];
624 Comparator<? super E> cmp = comparator;
626 siftDownComparable(i, moved, array, n);
628 siftDownUsingComparator(i, moved, array, n, cmp);
629 if (array[i] == moved) {
631 siftUpComparable(i, moved, array);
633 siftUpUsingComparator(i, moved, array, cmp);
640 * Removes a single instance of the specified element from this queue,
641 * if it is present. More formally, removes an element {@code e} such
642 * that {@code o.equals(e)}, if this queue contains one or more such
643 * elements. Returns {@code true} if and only if this queue contained
644 * the specified element (or equivalently, if this queue changed as a
645 * result of the call).
647 * @param o element to be removed from this queue, if present
648 * @return {@code true} if this queue changed as a result of the call
650 public boolean remove(Object o) {
651 boolean removed = false;
652 final ReentrantLock lock = this.lock;
668 * Identity-based version for use in Itr.remove
670 private void removeEQ(Object o) {
671 final ReentrantLock lock = this.lock;
674 Object[] array = queue;
676 for (int i = 0; i < n; i++) {
688 * Returns {@code true} if this queue contains the specified element.
689 * More formally, returns {@code true} if and only if this queue contains
690 * at least one element {@code e} such that {@code o.equals(e)}.
692 * @param o object to be checked for containment in this queue
693 * @return {@code true} if this queue contains the specified element
695 public boolean contains(Object o) {
697 final ReentrantLock lock = this.lock;
708 * Returns an array containing all of the elements in this queue.
709 * The returned array elements are in no particular order.
711 * <p>The returned array will be "safe" in that no references to it are
712 * maintained by this queue. (In other words, this method must allocate
713 * a new array). The caller is thus free to modify the returned array.
715 * <p>This method acts as bridge between array-based and collection-based
718 * @return an array containing all of the elements in this queue
720 public Object[] toArray() {
721 final ReentrantLock lock = this.lock;
724 return Arrays.copyOf(queue, size);
731 public String toString() {
732 final ReentrantLock lock = this.lock;
738 StringBuilder sb = new StringBuilder();
740 for (int i = 0; i < n; ++i) {
742 sb.append(e == this ? "(this Collection)" : e);
744 sb.append(',').append(' ');
746 return sb.append(']').toString();
753 * @throws UnsupportedOperationException {@inheritDoc}
754 * @throws ClassCastException {@inheritDoc}
755 * @throws NullPointerException {@inheritDoc}
756 * @throws IllegalArgumentException {@inheritDoc}
758 public int drainTo(Collection<? super E> c) {
760 throw new NullPointerException();
762 throw new IllegalArgumentException();
763 final ReentrantLock lock = this.lock;
768 while ( (e = extract()) != null) {
779 * @throws UnsupportedOperationException {@inheritDoc}
780 * @throws ClassCastException {@inheritDoc}
781 * @throws NullPointerException {@inheritDoc}
782 * @throws IllegalArgumentException {@inheritDoc}
784 public int drainTo(Collection<? super E> c, int maxElements) {
786 throw new NullPointerException();
788 throw new IllegalArgumentException();
789 if (maxElements <= 0)
791 final ReentrantLock lock = this.lock;
796 while (n < maxElements && (e = extract()) != null) {
807 * Atomically removes all of the elements from this queue.
808 * The queue will be empty after this call returns.
810 public void clear() {
811 final ReentrantLock lock = this.lock;
814 Object[] array = queue;
817 for (int i = 0; i < n; i++)
825 * Returns an array containing all of the elements in this queue; the
826 * runtime type of the returned array is that of the specified array.
827 * The returned array elements are in no particular order.
828 * If the queue fits in the specified array, it is returned therein.
829 * Otherwise, a new array is allocated with the runtime type of the
830 * specified array and the size of this queue.
832 * <p>If this queue fits in the specified array with room to spare
833 * (i.e., the array has more elements than this queue), the element in
834 * the array immediately following the end of the queue is set to
837 * <p>Like the {@link #toArray()} method, this method acts as bridge between
838 * array-based and collection-based APIs. Further, this method allows
839 * precise control over the runtime type of the output array, and may,
840 * under certain circumstances, be used to save allocation costs.
842 * <p>Suppose {@code x} is a queue known to contain only strings.
843 * The following code can be used to dump the queue into a newly
844 * allocated array of {@code String}:
847 * String[] y = x.toArray(new String[0]);</pre>
849 * Note that {@code toArray(new Object[0])} is identical in function to
852 * @param a the array into which the elements of the queue are to
853 * be stored, if it is big enough; otherwise, a new array of the
854 * same runtime type is allocated for this purpose
855 * @return an array containing all of the elements in this queue
856 * @throws ArrayStoreException if the runtime type of the specified array
857 * is not a supertype of the runtime type of every element in
859 * @throws NullPointerException if the specified array is null
861 public <T> T[] toArray(T[] a) {
862 final ReentrantLock lock = this.lock;
867 // Make a new array of a's runtime type, but my contents:
868 return (T[]) Arrays.copyOf(queue, size, a.getClass());
869 System.arraycopy(queue, 0, a, 0, n);
879 * Returns an iterator over the elements in this queue. The
880 * iterator does not return the elements in any particular order.
882 * <p>The returned iterator is a "weakly consistent" iterator that
883 * will never throw {@link java.util.ConcurrentModificationException
884 * ConcurrentModificationException}, and guarantees to traverse
885 * elements as they existed upon construction of the iterator, and
886 * may (but is not guaranteed to) reflect any modifications
887 * subsequent to construction.
889 * @return an iterator over the elements in this queue
891 public Iterator<E> iterator() {
892 return new Itr(toArray());
896 * Snapshot iterator that works off copy of underlying q array.
898 final class Itr implements Iterator<E> {
899 final Object[] array; // Array of all elements
900 int cursor; // index of next element to return;
901 int lastRet; // index of last element, or -1 if no such
903 Itr(Object[] array) {
908 public boolean hasNext() {
909 return cursor < array.length;
913 if (cursor >= array.length)
914 throw new NoSuchElementException();
916 return (E)array[cursor++];
919 public void remove() {
921 throw new IllegalStateException();
922 removeEQ(array[lastRet]);
928 * Saves the state to a stream (that is, serializes it). For
929 * compatibility with previous version of this class,
930 * elements are first copied to a java.util.PriorityQueue,
931 * which is then serialized.
933 private void writeObject(java.io.ObjectOutputStream s)
934 throws java.io.IOException {
937 int n = size; // avoid zero capacity argument
938 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
940 s.defaultWriteObject();
948 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
949 * (that is, deserializes it).
951 * @param s the stream
953 private void readObject(java.io.ObjectInputStream s)
954 throws java.io.IOException, ClassNotFoundException {
956 s.defaultReadObject();
957 this.queue = new Object[q.size()];
958 comparator = q.comparator();
966 private static final sun.misc.Unsafe UNSAFE;
967 private static final long allocationSpinLockOffset;
970 UNSAFE = sun.misc.Unsafe.getUnsafe();
971 Class k = PriorityBlockingQueue.class;
972 allocationSpinLockOffset = UNSAFE.objectFieldOffset
973 (k.getDeclaredField("allocationSpinLock"));
974 } catch (Exception e) {