rt/emul/compact/src/main/java/java/util/concurrent/PriorityBlockingQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 12:51:03 +0100
changeset 1895 bfaf3300b7ba
parent 1890 212417b74b72
permissions -rw-r--r--
Making java.util.concurrent package compilable except ForkJoinPool
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.concurrent.locks.*;
    39 import java.util.*;
    40 
    41 /**
    42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
    43  * the same ordering rules as class {@link PriorityQueue} and supplies
    44  * blocking retrieval operations.  While this queue is logically
    45  * unbounded, attempted additions may fail due to resource exhaustion
    46  * (causing {@code OutOfMemoryError}). This class does not permit
    47  * {@code null} elements.  A priority queue relying on {@linkplain
    48  * Comparable natural ordering} also does not permit insertion of
    49  * non-comparable objects (doing so results in
    50  * {@code ClassCastException}).
    51  *
    52  * <p>This class and its iterator implement all of the
    53  * <em>optional</em> methods of the {@link Collection} and {@link
    54  * Iterator} interfaces.  The Iterator provided in method {@link
    55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
    56  * the PriorityBlockingQueue in any particular order. If you need
    57  * ordered traversal, consider using
    58  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
    59  * can be used to <em>remove</em> some or all elements in priority
    60  * order and place them in another collection.
    61  *
    62  * <p>Operations on this class make no guarantees about the ordering
    63  * of elements with equal priority. If you need to enforce an
    64  * ordering, you can define custom classes or comparators that use a
    65  * secondary key to break ties in primary priority values.  For
    66  * example, here is a class that applies first-in-first-out
    67  * tie-breaking to comparable elements. To use it, you would insert a
    68  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
    69  *
    70  *  <pre> {@code
    71  * class FIFOEntry<E extends Comparable<? super E>>
    72  *     implements Comparable<FIFOEntry<E>> {
    73  *   static final AtomicLong seq = new AtomicLong(0);
    74  *   final long seqNum;
    75  *   final E entry;
    76  *   public FIFOEntry(E entry) {
    77  *     seqNum = seq.getAndIncrement();
    78  *     this.entry = entry;
    79  *   }
    80  *   public E getEntry() { return entry; }
    81  *   public int compareTo(FIFOEntry<E> other) {
    82  *     int res = entry.compareTo(other.entry);
    83  *     if (res == 0 && other.entry != this.entry)
    84  *       res = (seqNum < other.seqNum ? -1 : 1);
    85  *     return res;
    86  *   }
    87  * }}</pre>
    88  *
    89  * <p>This class is a member of the
    90  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    91  * Java Collections Framework</a>.
    92  *
    93  * @since 1.5
    94  * @author Doug Lea
    95  * @param <E> the type of elements held in this collection
    96  */
    97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    98     implements BlockingQueue<E>, java.io.Serializable {
    99     private static final long serialVersionUID = 5595510919245408276L;
   100 
   101     /*
   102      * The implementation uses an array-based binary heap, with public
   103      * operations protected with a single lock. However, allocation
   104      * during resizing uses a simple spinlock (used only while not
   105      * holding main lock) in order to allow takes to operate
   106      * concurrently with allocation.  This avoids repeated
   107      * postponement of waiting consumers and consequent element
   108      * build-up. The need to back away from lock during allocation
   109      * makes it impossible to simply wrap delegated
   110      * java.util.PriorityQueue operations within a lock, as was done
   111      * in a previous version of this class. To maintain
   112      * interoperability, a plain PriorityQueue is still used during
   113      * serialization, which maintains compatibility at the espense of
   114      * transiently doubling overhead.
   115      */
   116 
   117     /**
   118      * Default array capacity.
   119      */
   120     private static final int DEFAULT_INITIAL_CAPACITY = 11;
   121 
   122     /**
   123      * The maximum size of array to allocate.
   124      * Some VMs reserve some header words in an array.
   125      * Attempts to allocate larger arrays may result in
   126      * OutOfMemoryError: Requested array size exceeds VM limit
   127      */
   128     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
   129 
   130     /**
   131      * Priority queue represented as a balanced binary heap: the two
   132      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
   133      * priority queue is ordered by comparator, or by the elements'
   134      * natural ordering, if comparator is null: For each node n in the
   135      * heap and each descendant d of n, n <= d.  The element with the
   136      * lowest value is in queue[0], assuming the queue is nonempty.
   137      */
   138     private transient Object[] queue;
   139 
   140     /**
   141      * The number of elements in the priority queue.
   142      */
   143     private transient int size;
   144 
   145     /**
   146      * The comparator, or null if priority queue uses elements'
   147      * natural ordering.
   148      */
   149     private transient Comparator<? super E> comparator;
   150 
   151     /**
   152      * Lock used for all public operations
   153      */
   154     private final ReentrantLock lock;
   155 
   156     /**
   157      * Condition for blocking when empty
   158      */
   159     private final Condition notEmpty;
   160 
   161     /**
   162      * Spinlock for allocation, acquired via CAS.
   163      */
   164     private transient volatile int allocationSpinLock;
   165 
   166     /**
   167      * A plain PriorityQueue used only for serialization,
   168      * to maintain compatibility with previous versions
   169      * of this class. Non-null only during serialization/deserialization.
   170      */
   171     private PriorityQueue q;
   172 
   173     /**
   174      * Creates a {@code PriorityBlockingQueue} with the default
   175      * initial capacity (11) that orders its elements according to
   176      * their {@linkplain Comparable natural ordering}.
   177      */
   178     public PriorityBlockingQueue() {
   179         this(DEFAULT_INITIAL_CAPACITY, null);
   180     }
   181 
   182     /**
   183      * Creates a {@code PriorityBlockingQueue} with the specified
   184      * initial capacity that orders its elements according to their
   185      * {@linkplain Comparable natural ordering}.
   186      *
   187      * @param initialCapacity the initial capacity for this priority queue
   188      * @throws IllegalArgumentException if {@code initialCapacity} is less
   189      *         than 1
   190      */
   191     public PriorityBlockingQueue(int initialCapacity) {
   192         this(initialCapacity, null);
   193     }
   194 
   195     /**
   196      * Creates a {@code PriorityBlockingQueue} with the specified initial
   197      * capacity that orders its elements according to the specified
   198      * comparator.
   199      *
   200      * @param initialCapacity the initial capacity for this priority queue
   201      * @param  comparator the comparator that will be used to order this
   202      *         priority queue.  If {@code null}, the {@linkplain Comparable
   203      *         natural ordering} of the elements will be used.
   204      * @throws IllegalArgumentException if {@code initialCapacity} is less
   205      *         than 1
   206      */
   207     public PriorityBlockingQueue(int initialCapacity,
   208                                  Comparator<? super E> comparator) {
   209         if (initialCapacity < 1)
   210             throw new IllegalArgumentException();
   211         this.lock = new ReentrantLock();
   212         this.notEmpty = lock.newCondition();
   213         this.comparator = comparator;
   214         this.queue = new Object[initialCapacity];
   215     }
   216 
   217     /**
   218      * Creates a {@code PriorityBlockingQueue} containing the elements
   219      * in the specified collection.  If the specified collection is a
   220      * {@link SortedSet} or a {@link PriorityQueue},  this
   221      * priority queue will be ordered according to the same ordering.
   222      * Otherwise, this priority queue will be ordered according to the
   223      * {@linkplain Comparable natural ordering} of its elements.
   224      *
   225      * @param  c the collection whose elements are to be placed
   226      *         into this priority queue
   227      * @throws ClassCastException if elements of the specified collection
   228      *         cannot be compared to one another according to the priority
   229      *         queue's ordering
   230      * @throws NullPointerException if the specified collection or any
   231      *         of its elements are null
   232      */
   233     public PriorityBlockingQueue(Collection<? extends E> c) {
   234         this.lock = new ReentrantLock();
   235         this.notEmpty = lock.newCondition();
   236         boolean heapify = true; // true if not known to be in heap order
   237         boolean screen = true;  // true if must screen for nulls
   238         if (c instanceof SortedSet<?>) {
   239             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
   240             this.comparator = (Comparator<? super E>) ss.comparator();
   241             heapify = false;
   242         }
   243         else if (c instanceof PriorityBlockingQueue<?>) {
   244             PriorityBlockingQueue<? extends E> pq =
   245                 (PriorityBlockingQueue<? extends E>) c;
   246             this.comparator = (Comparator<? super E>) pq.comparator();
   247             screen = false;
   248             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
   249                 heapify = false;
   250         }
   251         Object[] a = c.toArray();
   252         int n = a.length;
   253         // If c.toArray incorrectly doesn't return Object[], copy it.
   254         if (a.getClass() != Object[].class)
   255             a = Arrays.copyOf(a, n, Object[].class);
   256         if (screen && (n == 1 || this.comparator != null)) {
   257             for (int i = 0; i < n; ++i)
   258                 if (a[i] == null)
   259                     throw new NullPointerException();
   260         }
   261         this.queue = a;
   262         this.size = n;
   263         if (heapify)
   264             heapify();
   265     }
   266 
   267     /**
   268      * Tries to grow array to accommodate at least one more element
   269      * (but normally expand by about 50%), giving up (allowing retry)
   270      * on contention (which we expect to be rare). Call only while
   271      * holding lock.
   272      *
   273      * @param array the heap array
   274      * @param oldCap the length of the array
   275      */
   276     private void tryGrow(Object[] array, int oldCap) {
   277         lock.unlock(); // must release and then re-acquire main lock
   278         Object[] newArray = null;
   279         if (allocationSpinLock == 0) {
   280             allocationSpinLock = 1;
   281             try {
   282                 int newCap = oldCap + ((oldCap < 64) ?
   283                                        (oldCap + 2) : // grow faster if small
   284                                        (oldCap >> 1));
   285                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
   286                     int minCap = oldCap + 1;
   287                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
   288                         throw new OutOfMemoryError();
   289                     newCap = MAX_ARRAY_SIZE;
   290                 }
   291                 if (newCap > oldCap && queue == array)
   292                     newArray = new Object[newCap];
   293             } finally {
   294                 allocationSpinLock = 0;
   295             }
   296         }
   297         if (newArray == null) // back off if another thread is allocating
   298             Thread.yield();
   299         lock.lock();
   300         if (newArray != null && queue == array) {
   301             queue = newArray;
   302             System.arraycopy(array, 0, newArray, 0, oldCap);
   303         }
   304     }
   305 
   306     /**
   307      * Mechanics for poll().  Call only while holding lock.
   308      */
   309     private E extract() {
   310         E result;
   311         int n = size - 1;
   312         if (n < 0)
   313             result = null;
   314         else {
   315             Object[] array = queue;
   316             result = (E) array[0];
   317             E x = (E) array[n];
   318             array[n] = null;
   319             Comparator<? super E> cmp = comparator;
   320             if (cmp == null)
   321                 siftDownComparable(0, x, array, n);
   322             else
   323                 siftDownUsingComparator(0, x, array, n, cmp);
   324             size = n;
   325         }
   326         return result;
   327     }
   328 
   329     /**
   330      * Inserts item x at position k, maintaining heap invariant by
   331      * promoting x up the tree until it is greater than or equal to
   332      * its parent, or is the root.
   333      *
   334      * To simplify and speed up coercions and comparisons. the
   335      * Comparable and Comparator versions are separated into different
   336      * methods that are otherwise identical. (Similarly for siftDown.)
   337      * These methods are static, with heap state as arguments, to
   338      * simplify use in light of possible comparator exceptions.
   339      *
   340      * @param k the position to fill
   341      * @param x the item to insert
   342      * @param array the heap array
   343      * @param n heap size
   344      */
   345     private static <T> void siftUpComparable(int k, T x, Object[] array) {
   346         Comparable<? super T> key = (Comparable<? super T>) x;
   347         while (k > 0) {
   348             int parent = (k - 1) >>> 1;
   349             Object e = array[parent];
   350             if (key.compareTo((T) e) >= 0)
   351                 break;
   352             array[k] = e;
   353             k = parent;
   354         }
   355         array[k] = key;
   356     }
   357 
   358     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
   359                                        Comparator<? super T> cmp) {
   360         while (k > 0) {
   361             int parent = (k - 1) >>> 1;
   362             Object e = array[parent];
   363             if (cmp.compare(x, (T) e) >= 0)
   364                 break;
   365             array[k] = e;
   366             k = parent;
   367         }
   368         array[k] = x;
   369     }
   370 
   371     /**
   372      * Inserts item x at position k, maintaining heap invariant by
   373      * demoting x down the tree repeatedly until it is less than or
   374      * equal to its children or is a leaf.
   375      *
   376      * @param k the position to fill
   377      * @param x the item to insert
   378      * @param array the heap array
   379      * @param n heap size
   380      */
   381     private static <T> void siftDownComparable(int k, T x, Object[] array,
   382                                                int n) {
   383         Comparable<? super T> key = (Comparable<? super T>)x;
   384         int half = n >>> 1;           // loop while a non-leaf
   385         while (k < half) {
   386             int child = (k << 1) + 1; // assume left child is least
   387             Object c = array[child];
   388             int right = child + 1;
   389             if (right < n &&
   390                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
   391                 c = array[child = right];
   392             if (key.compareTo((T) c) <= 0)
   393                 break;
   394             array[k] = c;
   395             k = child;
   396         }
   397         array[k] = key;
   398     }
   399 
   400     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
   401                                                     int n,
   402                                                     Comparator<? super T> cmp) {
   403         int half = n >>> 1;
   404         while (k < half) {
   405             int child = (k << 1) + 1;
   406             Object c = array[child];
   407             int right = child + 1;
   408             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
   409                 c = array[child = right];
   410             if (cmp.compare(x, (T) c) <= 0)
   411                 break;
   412             array[k] = c;
   413             k = child;
   414         }
   415         array[k] = x;
   416     }
   417 
   418     /**
   419      * Establishes the heap invariant (described above) in the entire tree,
   420      * assuming nothing about the order of the elements prior to the call.
   421      */
   422     private void heapify() {
   423         Object[] array = queue;
   424         int n = size;
   425         int half = (n >>> 1) - 1;
   426         Comparator<? super E> cmp = comparator;
   427         if (cmp == null) {
   428             for (int i = half; i >= 0; i--)
   429                 siftDownComparable(i, (E) array[i], array, n);
   430         }
   431         else {
   432             for (int i = half; i >= 0; i--)
   433                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
   434         }
   435     }
   436 
   437     /**
   438      * Inserts the specified element into this priority queue.
   439      *
   440      * @param e the element to add
   441      * @return {@code true} (as specified by {@link Collection#add})
   442      * @throws ClassCastException if the specified element cannot be compared
   443      *         with elements currently in the priority queue according to the
   444      *         priority queue's ordering
   445      * @throws NullPointerException if the specified element is null
   446      */
   447     public boolean add(E e) {
   448         return offer(e);
   449     }
   450 
   451     /**
   452      * Inserts the specified element into this priority queue.
   453      * As the queue is unbounded, this method will never return {@code false}.
   454      *
   455      * @param e the element to add
   456      * @return {@code true} (as specified by {@link Queue#offer})
   457      * @throws ClassCastException if the specified element cannot be compared
   458      *         with elements currently in the priority queue according to the
   459      *         priority queue's ordering
   460      * @throws NullPointerException if the specified element is null
   461      */
   462     public boolean offer(E e) {
   463         if (e == null)
   464             throw new NullPointerException();
   465         final ReentrantLock lock = this.lock;
   466         lock.lock();
   467         int n, cap;
   468         Object[] array;
   469         while ((n = size) >= (cap = (array = queue).length))
   470             tryGrow(array, cap);
   471         try {
   472             Comparator<? super E> cmp = comparator;
   473             if (cmp == null)
   474                 siftUpComparable(n, e, array);
   475             else
   476                 siftUpUsingComparator(n, e, array, cmp);
   477             size = n + 1;
   478             notEmpty.signal();
   479         } finally {
   480             lock.unlock();
   481         }
   482         return true;
   483     }
   484 
   485     /**
   486      * Inserts the specified element into this priority queue.
   487      * As the queue is unbounded, this method will never block.
   488      *
   489      * @param e the element to add
   490      * @throws ClassCastException if the specified element cannot be compared
   491      *         with elements currently in the priority queue according to the
   492      *         priority queue's ordering
   493      * @throws NullPointerException if the specified element is null
   494      */
   495     public void put(E e) {
   496         offer(e); // never need to block
   497     }
   498 
   499     /**
   500      * Inserts the specified element into this priority queue.
   501      * As the queue is unbounded, this method will never block or
   502      * return {@code false}.
   503      *
   504      * @param e the element to add
   505      * @param timeout This parameter is ignored as the method never blocks
   506      * @param unit This parameter is ignored as the method never blocks
   507      * @return {@code true} (as specified by
   508      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
   509      * @throws ClassCastException if the specified element cannot be compared
   510      *         with elements currently in the priority queue according to the
   511      *         priority queue's ordering
   512      * @throws NullPointerException if the specified element is null
   513      */
   514     public boolean offer(E e, long timeout, TimeUnit unit) {
   515         return offer(e); // never need to block
   516     }
   517 
   518     public E poll() {
   519         final ReentrantLock lock = this.lock;
   520         lock.lock();
   521         E result;
   522         try {
   523             result = extract();
   524         } finally {
   525             lock.unlock();
   526         }
   527         return result;
   528     }
   529 
   530     public E take() throws InterruptedException {
   531         final ReentrantLock lock = this.lock;
   532         lock.lockInterruptibly();
   533         E result;
   534         try {
   535             while ( (result = extract()) == null)
   536                 notEmpty.await();
   537         } finally {
   538             lock.unlock();
   539         }
   540         return result;
   541     }
   542 
   543     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   544         long nanos = unit.toNanos(timeout);
   545         final ReentrantLock lock = this.lock;
   546         lock.lockInterruptibly();
   547         E result;
   548         try {
   549             while ( (result = extract()) == null && nanos > 0)
   550                 nanos = notEmpty.awaitNanos(nanos);
   551         } finally {
   552             lock.unlock();
   553         }
   554         return result;
   555     }
   556 
   557     public E peek() {
   558         final ReentrantLock lock = this.lock;
   559         lock.lock();
   560         E result;
   561         try {
   562             result = size > 0 ? (E) queue[0] : null;
   563         } finally {
   564             lock.unlock();
   565         }
   566         return result;
   567     }
   568 
   569     /**
   570      * Returns the comparator used to order the elements in this queue,
   571      * or {@code null} if this queue uses the {@linkplain Comparable
   572      * natural ordering} of its elements.
   573      *
   574      * @return the comparator used to order the elements in this queue,
   575      *         or {@code null} if this queue uses the natural
   576      *         ordering of its elements
   577      */
   578     public Comparator<? super E> comparator() {
   579         return comparator;
   580     }
   581 
   582     public int size() {
   583         final ReentrantLock lock = this.lock;
   584         lock.lock();
   585         try {
   586             return size;
   587         } finally {
   588             lock.unlock();
   589         }
   590     }
   591 
   592     /**
   593      * Always returns {@code Integer.MAX_VALUE} because
   594      * a {@code PriorityBlockingQueue} is not capacity constrained.
   595      * @return {@code Integer.MAX_VALUE} always
   596      */
   597     public int remainingCapacity() {
   598         return Integer.MAX_VALUE;
   599     }
   600 
   601     private int indexOf(Object o) {
   602         if (o != null) {
   603             Object[] array = queue;
   604             int n = size;
   605             for (int i = 0; i < n; i++)
   606                 if (o.equals(array[i]))
   607                     return i;
   608         }
   609         return -1;
   610     }
   611 
   612     /**
   613      * Removes the ith element from queue.
   614      */
   615     private void removeAt(int i) {
   616         Object[] array = queue;
   617         int n = size - 1;
   618         if (n == i) // removed last element
   619             array[i] = null;
   620         else {
   621             E moved = (E) array[n];
   622             array[n] = null;
   623             Comparator<? super E> cmp = comparator;
   624             if (cmp == null)
   625                 siftDownComparable(i, moved, array, n);
   626             else
   627                 siftDownUsingComparator(i, moved, array, n, cmp);
   628             if (array[i] == moved) {
   629                 if (cmp == null)
   630                     siftUpComparable(i, moved, array);
   631                 else
   632                     siftUpUsingComparator(i, moved, array, cmp);
   633             }
   634         }
   635         size = n;
   636     }
   637 
   638     /**
   639      * Removes a single instance of the specified element from this queue,
   640      * if it is present.  More formally, removes an element {@code e} such
   641      * that {@code o.equals(e)}, if this queue contains one or more such
   642      * elements.  Returns {@code true} if and only if this queue contained
   643      * the specified element (or equivalently, if this queue changed as a
   644      * result of the call).
   645      *
   646      * @param o element to be removed from this queue, if present
   647      * @return {@code true} if this queue changed as a result of the call
   648      */
   649     public boolean remove(Object o) {
   650         boolean removed = false;
   651         final ReentrantLock lock = this.lock;
   652         lock.lock();
   653         try {
   654             int i = indexOf(o);
   655             if (i != -1) {
   656                 removeAt(i);
   657                 removed = true;
   658             }
   659         } finally {
   660             lock.unlock();
   661         }
   662         return removed;
   663     }
   664 
   665 
   666     /**
   667      * Identity-based version for use in Itr.remove
   668      */
   669     private void removeEQ(Object o) {
   670         final ReentrantLock lock = this.lock;
   671         lock.lock();
   672         try {
   673             Object[] array = queue;
   674             int n = size;
   675             for (int i = 0; i < n; i++) {
   676                 if (o == array[i]) {
   677                     removeAt(i);
   678                     break;
   679                 }
   680             }
   681         } finally {
   682             lock.unlock();
   683         }
   684     }
   685 
   686     /**
   687      * Returns {@code true} if this queue contains the specified element.
   688      * More formally, returns {@code true} if and only if this queue contains
   689      * at least one element {@code e} such that {@code o.equals(e)}.
   690      *
   691      * @param o object to be checked for containment in this queue
   692      * @return {@code true} if this queue contains the specified element
   693      */
   694     public boolean contains(Object o) {
   695         int index;
   696         final ReentrantLock lock = this.lock;
   697         lock.lock();
   698         try {
   699             index = indexOf(o);
   700         } finally {
   701             lock.unlock();
   702         }
   703         return index != -1;
   704     }
   705 
   706     /**
   707      * Returns an array containing all of the elements in this queue.
   708      * The returned array elements are in no particular order.
   709      *
   710      * <p>The returned array will be "safe" in that no references to it are
   711      * maintained by this queue.  (In other words, this method must allocate
   712      * a new array).  The caller is thus free to modify the returned array.
   713      *
   714      * <p>This method acts as bridge between array-based and collection-based
   715      * APIs.
   716      *
   717      * @return an array containing all of the elements in this queue
   718      */
   719     public Object[] toArray() {
   720         final ReentrantLock lock = this.lock;
   721         lock.lock();
   722         try {
   723             return Arrays.copyOf(queue, size);
   724         } finally {
   725             lock.unlock();
   726         }
   727     }
   728 
   729 
   730     public String toString() {
   731         final ReentrantLock lock = this.lock;
   732         lock.lock();
   733         try {
   734             int n = size;
   735             if (n == 0)
   736                 return "[]";
   737             StringBuilder sb = new StringBuilder();
   738             sb.append('[');
   739             for (int i = 0; i < n; ++i) {
   740                 E e = (E)queue[i];
   741                 sb.append(e == this ? "(this Collection)" : e);
   742                 if (i != n - 1)
   743                     sb.append(',').append(' ');
   744             }
   745             return sb.append(']').toString();
   746         } finally {
   747             lock.unlock();
   748         }
   749     }
   750 
   751     /**
   752      * @throws UnsupportedOperationException {@inheritDoc}
   753      * @throws ClassCastException            {@inheritDoc}
   754      * @throws NullPointerException          {@inheritDoc}
   755      * @throws IllegalArgumentException      {@inheritDoc}
   756      */
   757     public int drainTo(Collection<? super E> c) {
   758         if (c == null)
   759             throw new NullPointerException();
   760         if (c == this)
   761             throw new IllegalArgumentException();
   762         final ReentrantLock lock = this.lock;
   763         lock.lock();
   764         try {
   765             int n = 0;
   766             E e;
   767             while ( (e = extract()) != null) {
   768                 c.add(e);
   769                 ++n;
   770             }
   771             return n;
   772         } finally {
   773             lock.unlock();
   774         }
   775     }
   776 
   777     /**
   778      * @throws UnsupportedOperationException {@inheritDoc}
   779      * @throws ClassCastException            {@inheritDoc}
   780      * @throws NullPointerException          {@inheritDoc}
   781      * @throws IllegalArgumentException      {@inheritDoc}
   782      */
   783     public int drainTo(Collection<? super E> c, int maxElements) {
   784         if (c == null)
   785             throw new NullPointerException();
   786         if (c == this)
   787             throw new IllegalArgumentException();
   788         if (maxElements <= 0)
   789             return 0;
   790         final ReentrantLock lock = this.lock;
   791         lock.lock();
   792         try {
   793             int n = 0;
   794             E e;
   795             while (n < maxElements && (e = extract()) != null) {
   796                 c.add(e);
   797                 ++n;
   798             }
   799             return n;
   800         } finally {
   801             lock.unlock();
   802         }
   803     }
   804 
   805     /**
   806      * Atomically removes all of the elements from this queue.
   807      * The queue will be empty after this call returns.
   808      */
   809     public void clear() {
   810         final ReentrantLock lock = this.lock;
   811         lock.lock();
   812         try {
   813             Object[] array = queue;
   814             int n = size;
   815             size = 0;
   816             for (int i = 0; i < n; i++)
   817                 array[i] = null;
   818         } finally {
   819             lock.unlock();
   820         }
   821     }
   822 
   823     /**
   824      * Returns an array containing all of the elements in this queue; the
   825      * runtime type of the returned array is that of the specified array.
   826      * The returned array elements are in no particular order.
   827      * If the queue fits in the specified array, it is returned therein.
   828      * Otherwise, a new array is allocated with the runtime type of the
   829      * specified array and the size of this queue.
   830      *
   831      * <p>If this queue fits in the specified array with room to spare
   832      * (i.e., the array has more elements than this queue), the element in
   833      * the array immediately following the end of the queue is set to
   834      * {@code null}.
   835      *
   836      * <p>Like the {@link #toArray()} method, this method acts as bridge between
   837      * array-based and collection-based APIs.  Further, this method allows
   838      * precise control over the runtime type of the output array, and may,
   839      * under certain circumstances, be used to save allocation costs.
   840      *
   841      * <p>Suppose {@code x} is a queue known to contain only strings.
   842      * The following code can be used to dump the queue into a newly
   843      * allocated array of {@code String}:
   844      *
   845      * <pre>
   846      *     String[] y = x.toArray(new String[0]);</pre>
   847      *
   848      * Note that {@code toArray(new Object[0])} is identical in function to
   849      * {@code toArray()}.
   850      *
   851      * @param a the array into which the elements of the queue are to
   852      *          be stored, if it is big enough; otherwise, a new array of the
   853      *          same runtime type is allocated for this purpose
   854      * @return an array containing all of the elements in this queue
   855      * @throws ArrayStoreException if the runtime type of the specified array
   856      *         is not a supertype of the runtime type of every element in
   857      *         this queue
   858      * @throws NullPointerException if the specified array is null
   859      */
   860     public <T> T[] toArray(T[] a) {
   861         final ReentrantLock lock = this.lock;
   862         lock.lock();
   863         try {
   864             int n = size;
   865             if (a.length < n)
   866                 // Make a new array of a's runtime type, but my contents:
   867                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
   868             System.arraycopy(queue, 0, a, 0, n);
   869             if (a.length > n)
   870                 a[n] = null;
   871             return a;
   872         } finally {
   873             lock.unlock();
   874         }
   875     }
   876 
   877     /**
   878      * Returns an iterator over the elements in this queue. The
   879      * iterator does not return the elements in any particular order.
   880      *
   881      * <p>The returned iterator is a "weakly consistent" iterator that
   882      * will never throw {@link java.util.ConcurrentModificationException
   883      * ConcurrentModificationException}, and guarantees to traverse
   884      * elements as they existed upon construction of the iterator, and
   885      * may (but is not guaranteed to) reflect any modifications
   886      * subsequent to construction.
   887      *
   888      * @return an iterator over the elements in this queue
   889      */
   890     public Iterator<E> iterator() {
   891         return new Itr(toArray());
   892     }
   893 
   894     /**
   895      * Snapshot iterator that works off copy of underlying q array.
   896      */
   897     final class Itr implements Iterator<E> {
   898         final Object[] array; // Array of all elements
   899         int cursor;           // index of next element to return;
   900         int lastRet;          // index of last element, or -1 if no such
   901 
   902         Itr(Object[] array) {
   903             lastRet = -1;
   904             this.array = array;
   905         }
   906 
   907         public boolean hasNext() {
   908             return cursor < array.length;
   909         }
   910 
   911         public E next() {
   912             if (cursor >= array.length)
   913                 throw new NoSuchElementException();
   914             lastRet = cursor;
   915             return (E)array[cursor++];
   916         }
   917 
   918         public void remove() {
   919             if (lastRet < 0)
   920                 throw new IllegalStateException();
   921             removeEQ(array[lastRet]);
   922             lastRet = -1;
   923         }
   924     }
   925 
   926     /**
   927      * Saves the state to a stream (that is, serializes it).  For
   928      * compatibility with previous version of this class,
   929      * elements are first copied to a java.util.PriorityQueue,
   930      * which is then serialized.
   931      */
   932     private void writeObject(java.io.ObjectOutputStream s)
   933         throws java.io.IOException {
   934         lock.lock();
   935         try {
   936             int n = size; // avoid zero capacity argument
   937             q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
   938             q.addAll(this);
   939             s.defaultWriteObject();
   940         } finally {
   941             q = null;
   942             lock.unlock();
   943         }
   944     }
   945 
   946     /**
   947      * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
   948      * (that is, deserializes it).
   949      *
   950      * @param s the stream
   951      */
   952     private void readObject(java.io.ObjectInputStream s)
   953         throws java.io.IOException, ClassNotFoundException {
   954         try {
   955             s.defaultReadObject();
   956             this.queue = new Object[q.size()];
   957             comparator = q.comparator();
   958             addAll(q);
   959         } finally {
   960             q = null;
   961         }
   962     }
   963 }