2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
36 package java.util.concurrent;
38 import java.util.concurrent.locks.*;
42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
43 * the same ordering rules as class {@link PriorityQueue} and supplies
44 * blocking retrieval operations. While this queue is logically
45 * unbounded, attempted additions may fail due to resource exhaustion
46 * (causing {@code OutOfMemoryError}). This class does not permit
47 * {@code null} elements. A priority queue relying on {@linkplain
48 * Comparable natural ordering} also does not permit insertion of
49 * non-comparable objects (doing so results in
50 * {@code ClassCastException}).
52 * <p>This class and its iterator implement all of the
53 * <em>optional</em> methods of the {@link Collection} and {@link
54 * Iterator} interfaces. The Iterator provided in method {@link
55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
56 * the PriorityBlockingQueue in any particular order. If you need
57 * ordered traversal, consider using
58 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
59 * can be used to <em>remove</em> some or all elements in priority
60 * order and place them in another collection.
62 * <p>Operations on this class make no guarantees about the ordering
63 * of elements with equal priority. If you need to enforce an
64 * ordering, you can define custom classes or comparators that use a
65 * secondary key to break ties in primary priority values. For
66 * example, here is a class that applies first-in-first-out
67 * tie-breaking to comparable elements. To use it, you would insert a
68 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
71 * class FIFOEntry<E extends Comparable<? super E>>
72 * implements Comparable<FIFOEntry<E>> {
73 * static final AtomicLong seq = new AtomicLong(0);
76 * public FIFOEntry(E entry) {
77 * seqNum = seq.getAndIncrement();
80 * public E getEntry() { return entry; }
81 * public int compareTo(FIFOEntry<E> other) {
82 * int res = entry.compareTo(other.entry);
83 * if (res == 0 && other.entry != this.entry)
84 * res = (seqNum < other.seqNum ? -1 : 1);
89 * <p>This class is a member of the
90 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
91 * Java Collections Framework</a>.
95 * @param <E> the type of elements held in this collection
97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
98 implements BlockingQueue<E>, java.io.Serializable {
99 private static final long serialVersionUID = 5595510919245408276L;
102 * The implementation uses an array-based binary heap, with public
103 * operations protected with a single lock. However, allocation
104 * during resizing uses a simple spinlock (used only while not
105 * holding main lock) in order to allow takes to operate
106 * concurrently with allocation. This avoids repeated
107 * postponement of waiting consumers and consequent element
108 * build-up. The need to back away from lock during allocation
109 * makes it impossible to simply wrap delegated
110 * java.util.PriorityQueue operations within a lock, as was done
111 * in a previous version of this class. To maintain
112 * interoperability, a plain PriorityQueue is still used during
113 * serialization, which maintains compatibility at the espense of
114 * transiently doubling overhead.
118 * Default array capacity.
120 private static final int DEFAULT_INITIAL_CAPACITY = 11;
123 * The maximum size of array to allocate.
124 * Some VMs reserve some header words in an array.
125 * Attempts to allocate larger arrays may result in
126 * OutOfMemoryError: Requested array size exceeds VM limit
128 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
131 * Priority queue represented as a balanced binary heap: the two
132 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
133 * priority queue is ordered by comparator, or by the elements'
134 * natural ordering, if comparator is null: For each node n in the
135 * heap and each descendant d of n, n <= d. The element with the
136 * lowest value is in queue[0], assuming the queue is nonempty.
138 private transient Object[] queue;
141 * The number of elements in the priority queue.
143 private transient int size;
146 * The comparator, or null if priority queue uses elements'
149 private transient Comparator<? super E> comparator;
152 * Lock used for all public operations
154 private final ReentrantLock lock;
157 * Condition for blocking when empty
159 private final Condition notEmpty;
162 * Spinlock for allocation, acquired via CAS.
164 private transient volatile int allocationSpinLock;
167 * A plain PriorityQueue used only for serialization,
168 * to maintain compatibility with previous versions
169 * of this class. Non-null only during serialization/deserialization.
171 private PriorityQueue q;
174 * Creates a {@code PriorityBlockingQueue} with the default
175 * initial capacity (11) that orders its elements according to
176 * their {@linkplain Comparable natural ordering}.
178 public PriorityBlockingQueue() {
179 this(DEFAULT_INITIAL_CAPACITY, null);
183 * Creates a {@code PriorityBlockingQueue} with the specified
184 * initial capacity that orders its elements according to their
185 * {@linkplain Comparable natural ordering}.
187 * @param initialCapacity the initial capacity for this priority queue
188 * @throws IllegalArgumentException if {@code initialCapacity} is less
191 public PriorityBlockingQueue(int initialCapacity) {
192 this(initialCapacity, null);
196 * Creates a {@code PriorityBlockingQueue} with the specified initial
197 * capacity that orders its elements according to the specified
200 * @param initialCapacity the initial capacity for this priority queue
201 * @param comparator the comparator that will be used to order this
202 * priority queue. If {@code null}, the {@linkplain Comparable
203 * natural ordering} of the elements will be used.
204 * @throws IllegalArgumentException if {@code initialCapacity} is less
207 public PriorityBlockingQueue(int initialCapacity,
208 Comparator<? super E> comparator) {
209 if (initialCapacity < 1)
210 throw new IllegalArgumentException();
211 this.lock = new ReentrantLock();
212 this.notEmpty = lock.newCondition();
213 this.comparator = comparator;
214 this.queue = new Object[initialCapacity];
218 * Creates a {@code PriorityBlockingQueue} containing the elements
219 * in the specified collection. If the specified collection is a
220 * {@link SortedSet} or a {@link PriorityQueue}, this
221 * priority queue will be ordered according to the same ordering.
222 * Otherwise, this priority queue will be ordered according to the
223 * {@linkplain Comparable natural ordering} of its elements.
225 * @param c the collection whose elements are to be placed
226 * into this priority queue
227 * @throws ClassCastException if elements of the specified collection
228 * cannot be compared to one another according to the priority
230 * @throws NullPointerException if the specified collection or any
231 * of its elements are null
233 public PriorityBlockingQueue(Collection<? extends E> c) {
234 this.lock = new ReentrantLock();
235 this.notEmpty = lock.newCondition();
236 boolean heapify = true; // true if not known to be in heap order
237 boolean screen = true; // true if must screen for nulls
238 if (c instanceof SortedSet<?>) {
239 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
240 this.comparator = (Comparator<? super E>) ss.comparator();
243 else if (c instanceof PriorityBlockingQueue<?>) {
244 PriorityBlockingQueue<? extends E> pq =
245 (PriorityBlockingQueue<? extends E>) c;
246 this.comparator = (Comparator<? super E>) pq.comparator();
248 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
251 Object[] a = c.toArray();
253 // If c.toArray incorrectly doesn't return Object[], copy it.
254 if (a.getClass() != Object[].class)
255 a = Arrays.copyOf(a, n, Object[].class);
256 if (screen && (n == 1 || this.comparator != null)) {
257 for (int i = 0; i < n; ++i)
259 throw new NullPointerException();
268 * Tries to grow array to accommodate at least one more element
269 * (but normally expand by about 50%), giving up (allowing retry)
270 * on contention (which we expect to be rare). Call only while
273 * @param array the heap array
274 * @param oldCap the length of the array
276 private void tryGrow(Object[] array, int oldCap) {
277 lock.unlock(); // must release and then re-acquire main lock
278 Object[] newArray = null;
279 if (allocationSpinLock == 0) {
280 allocationSpinLock = 1;
282 int newCap = oldCap + ((oldCap < 64) ?
283 (oldCap + 2) : // grow faster if small
285 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
286 int minCap = oldCap + 1;
287 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
288 throw new OutOfMemoryError();
289 newCap = MAX_ARRAY_SIZE;
291 if (newCap > oldCap && queue == array)
292 newArray = new Object[newCap];
294 allocationSpinLock = 0;
297 if (newArray == null) // back off if another thread is allocating
300 if (newArray != null && queue == array) {
302 System.arraycopy(array, 0, newArray, 0, oldCap);
307 * Mechanics for poll(). Call only while holding lock.
309 private E extract() {
315 Object[] array = queue;
316 result = (E) array[0];
319 Comparator<? super E> cmp = comparator;
321 siftDownComparable(0, x, array, n);
323 siftDownUsingComparator(0, x, array, n, cmp);
330 * Inserts item x at position k, maintaining heap invariant by
331 * promoting x up the tree until it is greater than or equal to
332 * its parent, or is the root.
334 * To simplify and speed up coercions and comparisons. the
335 * Comparable and Comparator versions are separated into different
336 * methods that are otherwise identical. (Similarly for siftDown.)
337 * These methods are static, with heap state as arguments, to
338 * simplify use in light of possible comparator exceptions.
340 * @param k the position to fill
341 * @param x the item to insert
342 * @param array the heap array
345 private static <T> void siftUpComparable(int k, T x, Object[] array) {
346 Comparable<? super T> key = (Comparable<? super T>) x;
348 int parent = (k - 1) >>> 1;
349 Object e = array[parent];
350 if (key.compareTo((T) e) >= 0)
358 private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
359 Comparator<? super T> cmp) {
361 int parent = (k - 1) >>> 1;
362 Object e = array[parent];
363 if (cmp.compare(x, (T) e) >= 0)
372 * Inserts item x at position k, maintaining heap invariant by
373 * demoting x down the tree repeatedly until it is less than or
374 * equal to its children or is a leaf.
376 * @param k the position to fill
377 * @param x the item to insert
378 * @param array the heap array
381 private static <T> void siftDownComparable(int k, T x, Object[] array,
383 Comparable<? super T> key = (Comparable<? super T>)x;
384 int half = n >>> 1; // loop while a non-leaf
386 int child = (k << 1) + 1; // assume left child is least
387 Object c = array[child];
388 int right = child + 1;
390 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
391 c = array[child = right];
392 if (key.compareTo((T) c) <= 0)
400 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
402 Comparator<? super T> cmp) {
405 int child = (k << 1) + 1;
406 Object c = array[child];
407 int right = child + 1;
408 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
409 c = array[child = right];
410 if (cmp.compare(x, (T) c) <= 0)
419 * Establishes the heap invariant (described above) in the entire tree,
420 * assuming nothing about the order of the elements prior to the call.
422 private void heapify() {
423 Object[] array = queue;
425 int half = (n >>> 1) - 1;
426 Comparator<? super E> cmp = comparator;
428 for (int i = half; i >= 0; i--)
429 siftDownComparable(i, (E) array[i], array, n);
432 for (int i = half; i >= 0; i--)
433 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
438 * Inserts the specified element into this priority queue.
440 * @param e the element to add
441 * @return {@code true} (as specified by {@link Collection#add})
442 * @throws ClassCastException if the specified element cannot be compared
443 * with elements currently in the priority queue according to the
444 * priority queue's ordering
445 * @throws NullPointerException if the specified element is null
447 public boolean add(E e) {
452 * Inserts the specified element into this priority queue.
453 * As the queue is unbounded, this method will never return {@code false}.
455 * @param e the element to add
456 * @return {@code true} (as specified by {@link Queue#offer})
457 * @throws ClassCastException if the specified element cannot be compared
458 * with elements currently in the priority queue according to the
459 * priority queue's ordering
460 * @throws NullPointerException if the specified element is null
462 public boolean offer(E e) {
464 throw new NullPointerException();
465 final ReentrantLock lock = this.lock;
469 while ((n = size) >= (cap = (array = queue).length))
472 Comparator<? super E> cmp = comparator;
474 siftUpComparable(n, e, array);
476 siftUpUsingComparator(n, e, array, cmp);
486 * Inserts the specified element into this priority queue.
487 * As the queue is unbounded, this method will never block.
489 * @param e the element to add
490 * @throws ClassCastException if the specified element cannot be compared
491 * with elements currently in the priority queue according to the
492 * priority queue's ordering
493 * @throws NullPointerException if the specified element is null
495 public void put(E e) {
496 offer(e); // never need to block
500 * Inserts the specified element into this priority queue.
501 * As the queue is unbounded, this method will never block or
502 * return {@code false}.
504 * @param e the element to add
505 * @param timeout This parameter is ignored as the method never blocks
506 * @param unit This parameter is ignored as the method never blocks
507 * @return {@code true} (as specified by
508 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
509 * @throws ClassCastException if the specified element cannot be compared
510 * with elements currently in the priority queue according to the
511 * priority queue's ordering
512 * @throws NullPointerException if the specified element is null
514 public boolean offer(E e, long timeout, TimeUnit unit) {
515 return offer(e); // never need to block
519 final ReentrantLock lock = this.lock;
530 public E take() throws InterruptedException {
531 final ReentrantLock lock = this.lock;
532 lock.lockInterruptibly();
535 while ( (result = extract()) == null)
543 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
544 long nanos = unit.toNanos(timeout);
545 final ReentrantLock lock = this.lock;
546 lock.lockInterruptibly();
549 while ( (result = extract()) == null && nanos > 0)
550 nanos = notEmpty.awaitNanos(nanos);
558 final ReentrantLock lock = this.lock;
562 result = size > 0 ? (E) queue[0] : null;
570 * Returns the comparator used to order the elements in this queue,
571 * or {@code null} if this queue uses the {@linkplain Comparable
572 * natural ordering} of its elements.
574 * @return the comparator used to order the elements in this queue,
575 * or {@code null} if this queue uses the natural
576 * ordering of its elements
578 public Comparator<? super E> comparator() {
583 final ReentrantLock lock = this.lock;
593 * Always returns {@code Integer.MAX_VALUE} because
594 * a {@code PriorityBlockingQueue} is not capacity constrained.
595 * @return {@code Integer.MAX_VALUE} always
597 public int remainingCapacity() {
598 return Integer.MAX_VALUE;
601 private int indexOf(Object o) {
603 Object[] array = queue;
605 for (int i = 0; i < n; i++)
606 if (o.equals(array[i]))
613 * Removes the ith element from queue.
615 private void removeAt(int i) {
616 Object[] array = queue;
618 if (n == i) // removed last element
621 E moved = (E) array[n];
623 Comparator<? super E> cmp = comparator;
625 siftDownComparable(i, moved, array, n);
627 siftDownUsingComparator(i, moved, array, n, cmp);
628 if (array[i] == moved) {
630 siftUpComparable(i, moved, array);
632 siftUpUsingComparator(i, moved, array, cmp);
639 * Removes a single instance of the specified element from this queue,
640 * if it is present. More formally, removes an element {@code e} such
641 * that {@code o.equals(e)}, if this queue contains one or more such
642 * elements. Returns {@code true} if and only if this queue contained
643 * the specified element (or equivalently, if this queue changed as a
644 * result of the call).
646 * @param o element to be removed from this queue, if present
647 * @return {@code true} if this queue changed as a result of the call
649 public boolean remove(Object o) {
650 boolean removed = false;
651 final ReentrantLock lock = this.lock;
667 * Identity-based version for use in Itr.remove
669 private void removeEQ(Object o) {
670 final ReentrantLock lock = this.lock;
673 Object[] array = queue;
675 for (int i = 0; i < n; i++) {
687 * Returns {@code true} if this queue contains the specified element.
688 * More formally, returns {@code true} if and only if this queue contains
689 * at least one element {@code e} such that {@code o.equals(e)}.
691 * @param o object to be checked for containment in this queue
692 * @return {@code true} if this queue contains the specified element
694 public boolean contains(Object o) {
696 final ReentrantLock lock = this.lock;
707 * Returns an array containing all of the elements in this queue.
708 * The returned array elements are in no particular order.
710 * <p>The returned array will be "safe" in that no references to it are
711 * maintained by this queue. (In other words, this method must allocate
712 * a new array). The caller is thus free to modify the returned array.
714 * <p>This method acts as bridge between array-based and collection-based
717 * @return an array containing all of the elements in this queue
719 public Object[] toArray() {
720 final ReentrantLock lock = this.lock;
723 return Arrays.copyOf(queue, size);
730 public String toString() {
731 final ReentrantLock lock = this.lock;
737 StringBuilder sb = new StringBuilder();
739 for (int i = 0; i < n; ++i) {
741 sb.append(e == this ? "(this Collection)" : e);
743 sb.append(',').append(' ');
745 return sb.append(']').toString();
752 * @throws UnsupportedOperationException {@inheritDoc}
753 * @throws ClassCastException {@inheritDoc}
754 * @throws NullPointerException {@inheritDoc}
755 * @throws IllegalArgumentException {@inheritDoc}
757 public int drainTo(Collection<? super E> c) {
759 throw new NullPointerException();
761 throw new IllegalArgumentException();
762 final ReentrantLock lock = this.lock;
767 while ( (e = extract()) != null) {
778 * @throws UnsupportedOperationException {@inheritDoc}
779 * @throws ClassCastException {@inheritDoc}
780 * @throws NullPointerException {@inheritDoc}
781 * @throws IllegalArgumentException {@inheritDoc}
783 public int drainTo(Collection<? super E> c, int maxElements) {
785 throw new NullPointerException();
787 throw new IllegalArgumentException();
788 if (maxElements <= 0)
790 final ReentrantLock lock = this.lock;
795 while (n < maxElements && (e = extract()) != null) {
806 * Atomically removes all of the elements from this queue.
807 * The queue will be empty after this call returns.
809 public void clear() {
810 final ReentrantLock lock = this.lock;
813 Object[] array = queue;
816 for (int i = 0; i < n; i++)
824 * Returns an array containing all of the elements in this queue; the
825 * runtime type of the returned array is that of the specified array.
826 * The returned array elements are in no particular order.
827 * If the queue fits in the specified array, it is returned therein.
828 * Otherwise, a new array is allocated with the runtime type of the
829 * specified array and the size of this queue.
831 * <p>If this queue fits in the specified array with room to spare
832 * (i.e., the array has more elements than this queue), the element in
833 * the array immediately following the end of the queue is set to
836 * <p>Like the {@link #toArray()} method, this method acts as bridge between
837 * array-based and collection-based APIs. Further, this method allows
838 * precise control over the runtime type of the output array, and may,
839 * under certain circumstances, be used to save allocation costs.
841 * <p>Suppose {@code x} is a queue known to contain only strings.
842 * The following code can be used to dump the queue into a newly
843 * allocated array of {@code String}:
846 * String[] y = x.toArray(new String[0]);</pre>
848 * Note that {@code toArray(new Object[0])} is identical in function to
851 * @param a the array into which the elements of the queue are to
852 * be stored, if it is big enough; otherwise, a new array of the
853 * same runtime type is allocated for this purpose
854 * @return an array containing all of the elements in this queue
855 * @throws ArrayStoreException if the runtime type of the specified array
856 * is not a supertype of the runtime type of every element in
858 * @throws NullPointerException if the specified array is null
860 public <T> T[] toArray(T[] a) {
861 final ReentrantLock lock = this.lock;
866 // Make a new array of a's runtime type, but my contents:
867 return (T[]) Arrays.copyOf(queue, size, a.getClass());
868 System.arraycopy(queue, 0, a, 0, n);
878 * Returns an iterator over the elements in this queue. The
879 * iterator does not return the elements in any particular order.
881 * <p>The returned iterator is a "weakly consistent" iterator that
882 * will never throw {@link java.util.ConcurrentModificationException
883 * ConcurrentModificationException}, and guarantees to traverse
884 * elements as they existed upon construction of the iterator, and
885 * may (but is not guaranteed to) reflect any modifications
886 * subsequent to construction.
888 * @return an iterator over the elements in this queue
890 public Iterator<E> iterator() {
891 return new Itr(toArray());
895 * Snapshot iterator that works off copy of underlying q array.
897 final class Itr implements Iterator<E> {
898 final Object[] array; // Array of all elements
899 int cursor; // index of next element to return;
900 int lastRet; // index of last element, or -1 if no such
902 Itr(Object[] array) {
907 public boolean hasNext() {
908 return cursor < array.length;
912 if (cursor >= array.length)
913 throw new NoSuchElementException();
915 return (E)array[cursor++];
918 public void remove() {
920 throw new IllegalStateException();
921 removeEQ(array[lastRet]);
927 * Saves the state to a stream (that is, serializes it). For
928 * compatibility with previous version of this class,
929 * elements are first copied to a java.util.PriorityQueue,
930 * which is then serialized.
932 private void writeObject(java.io.ObjectOutputStream s)
933 throws java.io.IOException {
936 int n = size; // avoid zero capacity argument
937 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
939 s.defaultWriteObject();
947 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
948 * (that is, deserializes it).
950 * @param s the stream
952 private void readObject(java.io.ObjectInputStream s)
953 throws java.io.IOException, ClassNotFoundException {
955 s.defaultReadObject();
956 this.queue = new Object[q.size()];
957 comparator = q.comparator();