rt/emul/compact/src/main/java/java/util/concurrent/PriorityBlockingQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
child 1895 bfaf3300b7ba
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.concurrent.locks.*;
    39 import java.util.*;
    40 
    41 /**
    42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
    43  * the same ordering rules as class {@link PriorityQueue} and supplies
    44  * blocking retrieval operations.  While this queue is logically
    45  * unbounded, attempted additions may fail due to resource exhaustion
    46  * (causing {@code OutOfMemoryError}). This class does not permit
    47  * {@code null} elements.  A priority queue relying on {@linkplain
    48  * Comparable natural ordering} also does not permit insertion of
    49  * non-comparable objects (doing so results in
    50  * {@code ClassCastException}).
    51  *
    52  * <p>This class and its iterator implement all of the
    53  * <em>optional</em> methods of the {@link Collection} and {@link
    54  * Iterator} interfaces.  The Iterator provided in method {@link
    55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
    56  * the PriorityBlockingQueue in any particular order. If you need
    57  * ordered traversal, consider using
    58  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
    59  * can be used to <em>remove</em> some or all elements in priority
    60  * order and place them in another collection.
    61  *
    62  * <p>Operations on this class make no guarantees about the ordering
    63  * of elements with equal priority. If you need to enforce an
    64  * ordering, you can define custom classes or comparators that use a
    65  * secondary key to break ties in primary priority values.  For
    66  * example, here is a class that applies first-in-first-out
    67  * tie-breaking to comparable elements. To use it, you would insert a
    68  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
    69  *
    70  *  <pre> {@code
    71  * class FIFOEntry<E extends Comparable<? super E>>
    72  *     implements Comparable<FIFOEntry<E>> {
    73  *   static final AtomicLong seq = new AtomicLong(0);
    74  *   final long seqNum;
    75  *   final E entry;
    76  *   public FIFOEntry(E entry) {
    77  *     seqNum = seq.getAndIncrement();
    78  *     this.entry = entry;
    79  *   }
    80  *   public E getEntry() { return entry; }
    81  *   public int compareTo(FIFOEntry<E> other) {
    82  *     int res = entry.compareTo(other.entry);
    83  *     if (res == 0 && other.entry != this.entry)
    84  *       res = (seqNum < other.seqNum ? -1 : 1);
    85  *     return res;
    86  *   }
    87  * }}</pre>
    88  *
    89  * <p>This class is a member of the
    90  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    91  * Java Collections Framework</a>.
    92  *
    93  * @since 1.5
    94  * @author Doug Lea
    95  * @param <E> the type of elements held in this collection
    96  */
    97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    98     implements BlockingQueue<E>, java.io.Serializable {
    99     private static final long serialVersionUID = 5595510919245408276L;
   100 
   101     /*
   102      * The implementation uses an array-based binary heap, with public
   103      * operations protected with a single lock. However, allocation
   104      * during resizing uses a simple spinlock (used only while not
   105      * holding main lock) in order to allow takes to operate
   106      * concurrently with allocation.  This avoids repeated
   107      * postponement of waiting consumers and consequent element
   108      * build-up. The need to back away from lock during allocation
   109      * makes it impossible to simply wrap delegated
   110      * java.util.PriorityQueue operations within a lock, as was done
   111      * in a previous version of this class. To maintain
   112      * interoperability, a plain PriorityQueue is still used during
   113      * serialization, which maintains compatibility at the espense of
   114      * transiently doubling overhead.
   115      */
   116 
   117     /**
   118      * Default array capacity.
   119      */
   120     private static final int DEFAULT_INITIAL_CAPACITY = 11;
   121 
   122     /**
   123      * The maximum size of array to allocate.
   124      * Some VMs reserve some header words in an array.
   125      * Attempts to allocate larger arrays may result in
   126      * OutOfMemoryError: Requested array size exceeds VM limit
   127      */
   128     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
   129 
   130     /**
   131      * Priority queue represented as a balanced binary heap: the two
   132      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
   133      * priority queue is ordered by comparator, or by the elements'
   134      * natural ordering, if comparator is null: For each node n in the
   135      * heap and each descendant d of n, n <= d.  The element with the
   136      * lowest value is in queue[0], assuming the queue is nonempty.
   137      */
   138     private transient Object[] queue;
   139 
   140     /**
   141      * The number of elements in the priority queue.
   142      */
   143     private transient int size;
   144 
   145     /**
   146      * The comparator, or null if priority queue uses elements'
   147      * natural ordering.
   148      */
   149     private transient Comparator<? super E> comparator;
   150 
   151     /**
   152      * Lock used for all public operations
   153      */
   154     private final ReentrantLock lock;
   155 
   156     /**
   157      * Condition for blocking when empty
   158      */
   159     private final Condition notEmpty;
   160 
   161     /**
   162      * Spinlock for allocation, acquired via CAS.
   163      */
   164     private transient volatile int allocationSpinLock;
   165 
   166     /**
   167      * A plain PriorityQueue used only for serialization,
   168      * to maintain compatibility with previous versions
   169      * of this class. Non-null only during serialization/deserialization.
   170      */
   171     private PriorityQueue q;
   172 
   173     /**
   174      * Creates a {@code PriorityBlockingQueue} with the default
   175      * initial capacity (11) that orders its elements according to
   176      * their {@linkplain Comparable natural ordering}.
   177      */
   178     public PriorityBlockingQueue() {
   179         this(DEFAULT_INITIAL_CAPACITY, null);
   180     }
   181 
   182     /**
   183      * Creates a {@code PriorityBlockingQueue} with the specified
   184      * initial capacity that orders its elements according to their
   185      * {@linkplain Comparable natural ordering}.
   186      *
   187      * @param initialCapacity the initial capacity for this priority queue
   188      * @throws IllegalArgumentException if {@code initialCapacity} is less
   189      *         than 1
   190      */
   191     public PriorityBlockingQueue(int initialCapacity) {
   192         this(initialCapacity, null);
   193     }
   194 
   195     /**
   196      * Creates a {@code PriorityBlockingQueue} with the specified initial
   197      * capacity that orders its elements according to the specified
   198      * comparator.
   199      *
   200      * @param initialCapacity the initial capacity for this priority queue
   201      * @param  comparator the comparator that will be used to order this
   202      *         priority queue.  If {@code null}, the {@linkplain Comparable
   203      *         natural ordering} of the elements will be used.
   204      * @throws IllegalArgumentException if {@code initialCapacity} is less
   205      *         than 1
   206      */
   207     public PriorityBlockingQueue(int initialCapacity,
   208                                  Comparator<? super E> comparator) {
   209         if (initialCapacity < 1)
   210             throw new IllegalArgumentException();
   211         this.lock = new ReentrantLock();
   212         this.notEmpty = lock.newCondition();
   213         this.comparator = comparator;
   214         this.queue = new Object[initialCapacity];
   215     }
   216 
   217     /**
   218      * Creates a {@code PriorityBlockingQueue} containing the elements
   219      * in the specified collection.  If the specified collection is a
   220      * {@link SortedSet} or a {@link PriorityQueue},  this
   221      * priority queue will be ordered according to the same ordering.
   222      * Otherwise, this priority queue will be ordered according to the
   223      * {@linkplain Comparable natural ordering} of its elements.
   224      *
   225      * @param  c the collection whose elements are to be placed
   226      *         into this priority queue
   227      * @throws ClassCastException if elements of the specified collection
   228      *         cannot be compared to one another according to the priority
   229      *         queue's ordering
   230      * @throws NullPointerException if the specified collection or any
   231      *         of its elements are null
   232      */
   233     public PriorityBlockingQueue(Collection<? extends E> c) {
   234         this.lock = new ReentrantLock();
   235         this.notEmpty = lock.newCondition();
   236         boolean heapify = true; // true if not known to be in heap order
   237         boolean screen = true;  // true if must screen for nulls
   238         if (c instanceof SortedSet<?>) {
   239             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
   240             this.comparator = (Comparator<? super E>) ss.comparator();
   241             heapify = false;
   242         }
   243         else if (c instanceof PriorityBlockingQueue<?>) {
   244             PriorityBlockingQueue<? extends E> pq =
   245                 (PriorityBlockingQueue<? extends E>) c;
   246             this.comparator = (Comparator<? super E>) pq.comparator();
   247             screen = false;
   248             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
   249                 heapify = false;
   250         }
   251         Object[] a = c.toArray();
   252         int n = a.length;
   253         // If c.toArray incorrectly doesn't return Object[], copy it.
   254         if (a.getClass() != Object[].class)
   255             a = Arrays.copyOf(a, n, Object[].class);
   256         if (screen && (n == 1 || this.comparator != null)) {
   257             for (int i = 0; i < n; ++i)
   258                 if (a[i] == null)
   259                     throw new NullPointerException();
   260         }
   261         this.queue = a;
   262         this.size = n;
   263         if (heapify)
   264             heapify();
   265     }
   266 
   267     /**
   268      * Tries to grow array to accommodate at least one more element
   269      * (but normally expand by about 50%), giving up (allowing retry)
   270      * on contention (which we expect to be rare). Call only while
   271      * holding lock.
   272      *
   273      * @param array the heap array
   274      * @param oldCap the length of the array
   275      */
   276     private void tryGrow(Object[] array, int oldCap) {
   277         lock.unlock(); // must release and then re-acquire main lock
   278         Object[] newArray = null;
   279         if (allocationSpinLock == 0 &&
   280             UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
   281                                      0, 1)) {
   282             try {
   283                 int newCap = oldCap + ((oldCap < 64) ?
   284                                        (oldCap + 2) : // grow faster if small
   285                                        (oldCap >> 1));
   286                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
   287                     int minCap = oldCap + 1;
   288                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
   289                         throw new OutOfMemoryError();
   290                     newCap = MAX_ARRAY_SIZE;
   291                 }
   292                 if (newCap > oldCap && queue == array)
   293                     newArray = new Object[newCap];
   294             } finally {
   295                 allocationSpinLock = 0;
   296             }
   297         }
   298         if (newArray == null) // back off if another thread is allocating
   299             Thread.yield();
   300         lock.lock();
   301         if (newArray != null && queue == array) {
   302             queue = newArray;
   303             System.arraycopy(array, 0, newArray, 0, oldCap);
   304         }
   305     }
   306 
   307     /**
   308      * Mechanics for poll().  Call only while holding lock.
   309      */
   310     private E extract() {
   311         E result;
   312         int n = size - 1;
   313         if (n < 0)
   314             result = null;
   315         else {
   316             Object[] array = queue;
   317             result = (E) array[0];
   318             E x = (E) array[n];
   319             array[n] = null;
   320             Comparator<? super E> cmp = comparator;
   321             if (cmp == null)
   322                 siftDownComparable(0, x, array, n);
   323             else
   324                 siftDownUsingComparator(0, x, array, n, cmp);
   325             size = n;
   326         }
   327         return result;
   328     }
   329 
   330     /**
   331      * Inserts item x at position k, maintaining heap invariant by
   332      * promoting x up the tree until it is greater than or equal to
   333      * its parent, or is the root.
   334      *
   335      * To simplify and speed up coercions and comparisons. the
   336      * Comparable and Comparator versions are separated into different
   337      * methods that are otherwise identical. (Similarly for siftDown.)
   338      * These methods are static, with heap state as arguments, to
   339      * simplify use in light of possible comparator exceptions.
   340      *
   341      * @param k the position to fill
   342      * @param x the item to insert
   343      * @param array the heap array
   344      * @param n heap size
   345      */
   346     private static <T> void siftUpComparable(int k, T x, Object[] array) {
   347         Comparable<? super T> key = (Comparable<? super T>) x;
   348         while (k > 0) {
   349             int parent = (k - 1) >>> 1;
   350             Object e = array[parent];
   351             if (key.compareTo((T) e) >= 0)
   352                 break;
   353             array[k] = e;
   354             k = parent;
   355         }
   356         array[k] = key;
   357     }
   358 
   359     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
   360                                        Comparator<? super T> cmp) {
   361         while (k > 0) {
   362             int parent = (k - 1) >>> 1;
   363             Object e = array[parent];
   364             if (cmp.compare(x, (T) e) >= 0)
   365                 break;
   366             array[k] = e;
   367             k = parent;
   368         }
   369         array[k] = x;
   370     }
   371 
   372     /**
   373      * Inserts item x at position k, maintaining heap invariant by
   374      * demoting x down the tree repeatedly until it is less than or
   375      * equal to its children or is a leaf.
   376      *
   377      * @param k the position to fill
   378      * @param x the item to insert
   379      * @param array the heap array
   380      * @param n heap size
   381      */
   382     private static <T> void siftDownComparable(int k, T x, Object[] array,
   383                                                int n) {
   384         Comparable<? super T> key = (Comparable<? super T>)x;
   385         int half = n >>> 1;           // loop while a non-leaf
   386         while (k < half) {
   387             int child = (k << 1) + 1; // assume left child is least
   388             Object c = array[child];
   389             int right = child + 1;
   390             if (right < n &&
   391                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
   392                 c = array[child = right];
   393             if (key.compareTo((T) c) <= 0)
   394                 break;
   395             array[k] = c;
   396             k = child;
   397         }
   398         array[k] = key;
   399     }
   400 
   401     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
   402                                                     int n,
   403                                                     Comparator<? super T> cmp) {
   404         int half = n >>> 1;
   405         while (k < half) {
   406             int child = (k << 1) + 1;
   407             Object c = array[child];
   408             int right = child + 1;
   409             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
   410                 c = array[child = right];
   411             if (cmp.compare(x, (T) c) <= 0)
   412                 break;
   413             array[k] = c;
   414             k = child;
   415         }
   416         array[k] = x;
   417     }
   418 
   419     /**
   420      * Establishes the heap invariant (described above) in the entire tree,
   421      * assuming nothing about the order of the elements prior to the call.
   422      */
   423     private void heapify() {
   424         Object[] array = queue;
   425         int n = size;
   426         int half = (n >>> 1) - 1;
   427         Comparator<? super E> cmp = comparator;
   428         if (cmp == null) {
   429             for (int i = half; i >= 0; i--)
   430                 siftDownComparable(i, (E) array[i], array, n);
   431         }
   432         else {
   433             for (int i = half; i >= 0; i--)
   434                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
   435         }
   436     }
   437 
   438     /**
   439      * Inserts the specified element into this priority queue.
   440      *
   441      * @param e the element to add
   442      * @return {@code true} (as specified by {@link Collection#add})
   443      * @throws ClassCastException if the specified element cannot be compared
   444      *         with elements currently in the priority queue according to the
   445      *         priority queue's ordering
   446      * @throws NullPointerException if the specified element is null
   447      */
   448     public boolean add(E e) {
   449         return offer(e);
   450     }
   451 
   452     /**
   453      * Inserts the specified element into this priority queue.
   454      * As the queue is unbounded, this method will never return {@code false}.
   455      *
   456      * @param e the element to add
   457      * @return {@code true} (as specified by {@link Queue#offer})
   458      * @throws ClassCastException if the specified element cannot be compared
   459      *         with elements currently in the priority queue according to the
   460      *         priority queue's ordering
   461      * @throws NullPointerException if the specified element is null
   462      */
   463     public boolean offer(E e) {
   464         if (e == null)
   465             throw new NullPointerException();
   466         final ReentrantLock lock = this.lock;
   467         lock.lock();
   468         int n, cap;
   469         Object[] array;
   470         while ((n = size) >= (cap = (array = queue).length))
   471             tryGrow(array, cap);
   472         try {
   473             Comparator<? super E> cmp = comparator;
   474             if (cmp == null)
   475                 siftUpComparable(n, e, array);
   476             else
   477                 siftUpUsingComparator(n, e, array, cmp);
   478             size = n + 1;
   479             notEmpty.signal();
   480         } finally {
   481             lock.unlock();
   482         }
   483         return true;
   484     }
   485 
   486     /**
   487      * Inserts the specified element into this priority queue.
   488      * As the queue is unbounded, this method will never block.
   489      *
   490      * @param e the element to add
   491      * @throws ClassCastException if the specified element cannot be compared
   492      *         with elements currently in the priority queue according to the
   493      *         priority queue's ordering
   494      * @throws NullPointerException if the specified element is null
   495      */
   496     public void put(E e) {
   497         offer(e); // never need to block
   498     }
   499 
   500     /**
   501      * Inserts the specified element into this priority queue.
   502      * As the queue is unbounded, this method will never block or
   503      * return {@code false}.
   504      *
   505      * @param e the element to add
   506      * @param timeout This parameter is ignored as the method never blocks
   507      * @param unit This parameter is ignored as the method never blocks
   508      * @return {@code true} (as specified by
   509      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
   510      * @throws ClassCastException if the specified element cannot be compared
   511      *         with elements currently in the priority queue according to the
   512      *         priority queue's ordering
   513      * @throws NullPointerException if the specified element is null
   514      */
   515     public boolean offer(E e, long timeout, TimeUnit unit) {
   516         return offer(e); // never need to block
   517     }
   518 
   519     public E poll() {
   520         final ReentrantLock lock = this.lock;
   521         lock.lock();
   522         E result;
   523         try {
   524             result = extract();
   525         } finally {
   526             lock.unlock();
   527         }
   528         return result;
   529     }
   530 
   531     public E take() throws InterruptedException {
   532         final ReentrantLock lock = this.lock;
   533         lock.lockInterruptibly();
   534         E result;
   535         try {
   536             while ( (result = extract()) == null)
   537                 notEmpty.await();
   538         } finally {
   539             lock.unlock();
   540         }
   541         return result;
   542     }
   543 
   544     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   545         long nanos = unit.toNanos(timeout);
   546         final ReentrantLock lock = this.lock;
   547         lock.lockInterruptibly();
   548         E result;
   549         try {
   550             while ( (result = extract()) == null && nanos > 0)
   551                 nanos = notEmpty.awaitNanos(nanos);
   552         } finally {
   553             lock.unlock();
   554         }
   555         return result;
   556     }
   557 
   558     public E peek() {
   559         final ReentrantLock lock = this.lock;
   560         lock.lock();
   561         E result;
   562         try {
   563             result = size > 0 ? (E) queue[0] : null;
   564         } finally {
   565             lock.unlock();
   566         }
   567         return result;
   568     }
   569 
   570     /**
   571      * Returns the comparator used to order the elements in this queue,
   572      * or {@code null} if this queue uses the {@linkplain Comparable
   573      * natural ordering} of its elements.
   574      *
   575      * @return the comparator used to order the elements in this queue,
   576      *         or {@code null} if this queue uses the natural
   577      *         ordering of its elements
   578      */
   579     public Comparator<? super E> comparator() {
   580         return comparator;
   581     }
   582 
   583     public int size() {
   584         final ReentrantLock lock = this.lock;
   585         lock.lock();
   586         try {
   587             return size;
   588         } finally {
   589             lock.unlock();
   590         }
   591     }
   592 
   593     /**
   594      * Always returns {@code Integer.MAX_VALUE} because
   595      * a {@code PriorityBlockingQueue} is not capacity constrained.
   596      * @return {@code Integer.MAX_VALUE} always
   597      */
   598     public int remainingCapacity() {
   599         return Integer.MAX_VALUE;
   600     }
   601 
   602     private int indexOf(Object o) {
   603         if (o != null) {
   604             Object[] array = queue;
   605             int n = size;
   606             for (int i = 0; i < n; i++)
   607                 if (o.equals(array[i]))
   608                     return i;
   609         }
   610         return -1;
   611     }
   612 
   613     /**
   614      * Removes the ith element from queue.
   615      */
   616     private void removeAt(int i) {
   617         Object[] array = queue;
   618         int n = size - 1;
   619         if (n == i) // removed last element
   620             array[i] = null;
   621         else {
   622             E moved = (E) array[n];
   623             array[n] = null;
   624             Comparator<? super E> cmp = comparator;
   625             if (cmp == null)
   626                 siftDownComparable(i, moved, array, n);
   627             else
   628                 siftDownUsingComparator(i, moved, array, n, cmp);
   629             if (array[i] == moved) {
   630                 if (cmp == null)
   631                     siftUpComparable(i, moved, array);
   632                 else
   633                     siftUpUsingComparator(i, moved, array, cmp);
   634             }
   635         }
   636         size = n;
   637     }
   638 
   639     /**
   640      * Removes a single instance of the specified element from this queue,
   641      * if it is present.  More formally, removes an element {@code e} such
   642      * that {@code o.equals(e)}, if this queue contains one or more such
   643      * elements.  Returns {@code true} if and only if this queue contained
   644      * the specified element (or equivalently, if this queue changed as a
   645      * result of the call).
   646      *
   647      * @param o element to be removed from this queue, if present
   648      * @return {@code true} if this queue changed as a result of the call
   649      */
   650     public boolean remove(Object o) {
   651         boolean removed = false;
   652         final ReentrantLock lock = this.lock;
   653         lock.lock();
   654         try {
   655             int i = indexOf(o);
   656             if (i != -1) {
   657                 removeAt(i);
   658                 removed = true;
   659             }
   660         } finally {
   661             lock.unlock();
   662         }
   663         return removed;
   664     }
   665 
   666 
   667     /**
   668      * Identity-based version for use in Itr.remove
   669      */
   670     private void removeEQ(Object o) {
   671         final ReentrantLock lock = this.lock;
   672         lock.lock();
   673         try {
   674             Object[] array = queue;
   675             int n = size;
   676             for (int i = 0; i < n; i++) {
   677                 if (o == array[i]) {
   678                     removeAt(i);
   679                     break;
   680                 }
   681             }
   682         } finally {
   683             lock.unlock();
   684         }
   685     }
   686 
   687     /**
   688      * Returns {@code true} if this queue contains the specified element.
   689      * More formally, returns {@code true} if and only if this queue contains
   690      * at least one element {@code e} such that {@code o.equals(e)}.
   691      *
   692      * @param o object to be checked for containment in this queue
   693      * @return {@code true} if this queue contains the specified element
   694      */
   695     public boolean contains(Object o) {
   696         int index;
   697         final ReentrantLock lock = this.lock;
   698         lock.lock();
   699         try {
   700             index = indexOf(o);
   701         } finally {
   702             lock.unlock();
   703         }
   704         return index != -1;
   705     }
   706 
   707     /**
   708      * Returns an array containing all of the elements in this queue.
   709      * The returned array elements are in no particular order.
   710      *
   711      * <p>The returned array will be "safe" in that no references to it are
   712      * maintained by this queue.  (In other words, this method must allocate
   713      * a new array).  The caller is thus free to modify the returned array.
   714      *
   715      * <p>This method acts as bridge between array-based and collection-based
   716      * APIs.
   717      *
   718      * @return an array containing all of the elements in this queue
   719      */
   720     public Object[] toArray() {
   721         final ReentrantLock lock = this.lock;
   722         lock.lock();
   723         try {
   724             return Arrays.copyOf(queue, size);
   725         } finally {
   726             lock.unlock();
   727         }
   728     }
   729 
   730 
   731     public String toString() {
   732         final ReentrantLock lock = this.lock;
   733         lock.lock();
   734         try {
   735             int n = size;
   736             if (n == 0)
   737                 return "[]";
   738             StringBuilder sb = new StringBuilder();
   739             sb.append('[');
   740             for (int i = 0; i < n; ++i) {
   741                 E e = (E)queue[i];
   742                 sb.append(e == this ? "(this Collection)" : e);
   743                 if (i != n - 1)
   744                     sb.append(',').append(' ');
   745             }
   746             return sb.append(']').toString();
   747         } finally {
   748             lock.unlock();
   749         }
   750     }
   751 
   752     /**
   753      * @throws UnsupportedOperationException {@inheritDoc}
   754      * @throws ClassCastException            {@inheritDoc}
   755      * @throws NullPointerException          {@inheritDoc}
   756      * @throws IllegalArgumentException      {@inheritDoc}
   757      */
   758     public int drainTo(Collection<? super E> c) {
   759         if (c == null)
   760             throw new NullPointerException();
   761         if (c == this)
   762             throw new IllegalArgumentException();
   763         final ReentrantLock lock = this.lock;
   764         lock.lock();
   765         try {
   766             int n = 0;
   767             E e;
   768             while ( (e = extract()) != null) {
   769                 c.add(e);
   770                 ++n;
   771             }
   772             return n;
   773         } finally {
   774             lock.unlock();
   775         }
   776     }
   777 
   778     /**
   779      * @throws UnsupportedOperationException {@inheritDoc}
   780      * @throws ClassCastException            {@inheritDoc}
   781      * @throws NullPointerException          {@inheritDoc}
   782      * @throws IllegalArgumentException      {@inheritDoc}
   783      */
   784     public int drainTo(Collection<? super E> c, int maxElements) {
   785         if (c == null)
   786             throw new NullPointerException();
   787         if (c == this)
   788             throw new IllegalArgumentException();
   789         if (maxElements <= 0)
   790             return 0;
   791         final ReentrantLock lock = this.lock;
   792         lock.lock();
   793         try {
   794             int n = 0;
   795             E e;
   796             while (n < maxElements && (e = extract()) != null) {
   797                 c.add(e);
   798                 ++n;
   799             }
   800             return n;
   801         } finally {
   802             lock.unlock();
   803         }
   804     }
   805 
   806     /**
   807      * Atomically removes all of the elements from this queue.
   808      * The queue will be empty after this call returns.
   809      */
   810     public void clear() {
   811         final ReentrantLock lock = this.lock;
   812         lock.lock();
   813         try {
   814             Object[] array = queue;
   815             int n = size;
   816             size = 0;
   817             for (int i = 0; i < n; i++)
   818                 array[i] = null;
   819         } finally {
   820             lock.unlock();
   821         }
   822     }
   823 
   824     /**
   825      * Returns an array containing all of the elements in this queue; the
   826      * runtime type of the returned array is that of the specified array.
   827      * The returned array elements are in no particular order.
   828      * If the queue fits in the specified array, it is returned therein.
   829      * Otherwise, a new array is allocated with the runtime type of the
   830      * specified array and the size of this queue.
   831      *
   832      * <p>If this queue fits in the specified array with room to spare
   833      * (i.e., the array has more elements than this queue), the element in
   834      * the array immediately following the end of the queue is set to
   835      * {@code null}.
   836      *
   837      * <p>Like the {@link #toArray()} method, this method acts as bridge between
   838      * array-based and collection-based APIs.  Further, this method allows
   839      * precise control over the runtime type of the output array, and may,
   840      * under certain circumstances, be used to save allocation costs.
   841      *
   842      * <p>Suppose {@code x} is a queue known to contain only strings.
   843      * The following code can be used to dump the queue into a newly
   844      * allocated array of {@code String}:
   845      *
   846      * <pre>
   847      *     String[] y = x.toArray(new String[0]);</pre>
   848      *
   849      * Note that {@code toArray(new Object[0])} is identical in function to
   850      * {@code toArray()}.
   851      *
   852      * @param a the array into which the elements of the queue are to
   853      *          be stored, if it is big enough; otherwise, a new array of the
   854      *          same runtime type is allocated for this purpose
   855      * @return an array containing all of the elements in this queue
   856      * @throws ArrayStoreException if the runtime type of the specified array
   857      *         is not a supertype of the runtime type of every element in
   858      *         this queue
   859      * @throws NullPointerException if the specified array is null
   860      */
   861     public <T> T[] toArray(T[] a) {
   862         final ReentrantLock lock = this.lock;
   863         lock.lock();
   864         try {
   865             int n = size;
   866             if (a.length < n)
   867                 // Make a new array of a's runtime type, but my contents:
   868                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
   869             System.arraycopy(queue, 0, a, 0, n);
   870             if (a.length > n)
   871                 a[n] = null;
   872             return a;
   873         } finally {
   874             lock.unlock();
   875         }
   876     }
   877 
   878     /**
   879      * Returns an iterator over the elements in this queue. The
   880      * iterator does not return the elements in any particular order.
   881      *
   882      * <p>The returned iterator is a "weakly consistent" iterator that
   883      * will never throw {@link java.util.ConcurrentModificationException
   884      * ConcurrentModificationException}, and guarantees to traverse
   885      * elements as they existed upon construction of the iterator, and
   886      * may (but is not guaranteed to) reflect any modifications
   887      * subsequent to construction.
   888      *
   889      * @return an iterator over the elements in this queue
   890      */
   891     public Iterator<E> iterator() {
   892         return new Itr(toArray());
   893     }
   894 
   895     /**
   896      * Snapshot iterator that works off copy of underlying q array.
   897      */
   898     final class Itr implements Iterator<E> {
   899         final Object[] array; // Array of all elements
   900         int cursor;           // index of next element to return;
   901         int lastRet;          // index of last element, or -1 if no such
   902 
   903         Itr(Object[] array) {
   904             lastRet = -1;
   905             this.array = array;
   906         }
   907 
   908         public boolean hasNext() {
   909             return cursor < array.length;
   910         }
   911 
   912         public E next() {
   913             if (cursor >= array.length)
   914                 throw new NoSuchElementException();
   915             lastRet = cursor;
   916             return (E)array[cursor++];
   917         }
   918 
   919         public void remove() {
   920             if (lastRet < 0)
   921                 throw new IllegalStateException();
   922             removeEQ(array[lastRet]);
   923             lastRet = -1;
   924         }
   925     }
   926 
   927     /**
   928      * Saves the state to a stream (that is, serializes it).  For
   929      * compatibility with previous version of this class,
   930      * elements are first copied to a java.util.PriorityQueue,
   931      * which is then serialized.
   932      */
   933     private void writeObject(java.io.ObjectOutputStream s)
   934         throws java.io.IOException {
   935         lock.lock();
   936         try {
   937             int n = size; // avoid zero capacity argument
   938             q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
   939             q.addAll(this);
   940             s.defaultWriteObject();
   941         } finally {
   942             q = null;
   943             lock.unlock();
   944         }
   945     }
   946 
   947     /**
   948      * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
   949      * (that is, deserializes it).
   950      *
   951      * @param s the stream
   952      */
   953     private void readObject(java.io.ObjectInputStream s)
   954         throws java.io.IOException, ClassNotFoundException {
   955         try {
   956             s.defaultReadObject();
   957             this.queue = new Object[q.size()];
   958             comparator = q.comparator();
   959             addAll(q);
   960         } finally {
   961             q = null;
   962         }
   963     }
   964 
   965     // Unsafe mechanics
   966     private static final sun.misc.Unsafe UNSAFE;
   967     private static final long allocationSpinLockOffset;
   968     static {
   969         try {
   970             UNSAFE = sun.misc.Unsafe.getUnsafe();
   971             Class k = PriorityBlockingQueue.class;
   972             allocationSpinLockOffset = UNSAFE.objectFieldOffset
   973                 (k.getDeclaredField("allocationSpinLock"));
   974         } catch (Exception e) {
   975             throw new Error(e);
   976         }
   977     }
   978 }