rt/emul/compact/src/main/java/java/util/concurrent/LinkedBlockingQueue.java
author Jaroslav Tulach <jaroslav.tulach@apidesign.org>
Sat, 19 Mar 2016 10:46:31 +0100
branchjdk7-b147
changeset 1890 212417b74b72
permissions -rw-r--r--
Bringing in all concurrent package from JDK7-b147
     1 /*
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  *
     4  * This code is free software; you can redistribute it and/or modify it
     5  * under the terms of the GNU General Public License version 2 only, as
     6  * published by the Free Software Foundation.  Oracle designates this
     7  * particular file as subject to the "Classpath" exception as provided
     8  * by Oracle in the LICENSE file that accompanied this code.
     9  *
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    13  * version 2 for more details (a copy is included in the LICENSE file that
    14  * accompanied this code).
    15  *
    16  * You should have received a copy of the GNU General Public License version
    17  * 2 along with this work; if not, write to the Free Software Foundation,
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    19  *
    20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
    21  * or visit www.oracle.com if you need additional information or have any
    22  * questions.
    23  */
    24 
    25 /*
    26  * This file is available under and governed by the GNU General Public
    27  * License version 2 only, as published by the Free Software Foundation.
    28  * However, the following notice accompanied the original version of this
    29  * file:
    30  *
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
    32  * Expert Group and released to the public domain, as explained at
    33  * http://creativecommons.org/publicdomain/zero/1.0/
    34  */
    35 
    36 package java.util.concurrent;
    37 
    38 import java.util.concurrent.atomic.AtomicInteger;
    39 import java.util.concurrent.locks.Condition;
    40 import java.util.concurrent.locks.ReentrantLock;
    41 import java.util.AbstractQueue;
    42 import java.util.Collection;
    43 import java.util.Iterator;
    44 import java.util.NoSuchElementException;
    45 
    46 /**
    47  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
    48  * linked nodes.
    49  * This queue orders elements FIFO (first-in-first-out).
    50  * The <em>head</em> of the queue is that element that has been on the
    51  * queue the longest time.
    52  * The <em>tail</em> of the queue is that element that has been on the
    53  * queue the shortest time. New elements
    54  * are inserted at the tail of the queue, and the queue retrieval
    55  * operations obtain elements at the head of the queue.
    56  * Linked queues typically have higher throughput than array-based queues but
    57  * less predictable performance in most concurrent applications.
    58  *
    59  * <p> The optional capacity bound constructor argument serves as a
    60  * way to prevent excessive queue expansion. The capacity, if unspecified,
    61  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
    62  * dynamically created upon each insertion unless this would bring the
    63  * queue above capacity.
    64  *
    65  * <p>This class and its iterator implement all of the
    66  * <em>optional</em> methods of the {@link Collection} and {@link
    67  * Iterator} interfaces.
    68  *
    69  * <p>This class is a member of the
    70  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    71  * Java Collections Framework</a>.
    72  *
    73  * @since 1.5
    74  * @author Doug Lea
    75  * @param <E> the type of elements held in this collection
    76  *
    77  */
    78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
    79         implements BlockingQueue<E>, java.io.Serializable {
    80     private static final long serialVersionUID = -6903933977591709194L;
    81 
    82     /*
    83      * A variant of the "two lock queue" algorithm.  The putLock gates
    84      * entry to put (and offer), and has an associated condition for
    85      * waiting puts.  Similarly for the takeLock.  The "count" field
    86      * that they both rely on is maintained as an atomic to avoid
    87      * needing to get both locks in most cases. Also, to minimize need
    88      * for puts to get takeLock and vice-versa, cascading notifies are
    89      * used. When a put notices that it has enabled at least one take,
    90      * it signals taker. That taker in turn signals others if more
    91      * items have been entered since the signal. And symmetrically for
    92      * takes signalling puts. Operations such as remove(Object) and
    93      * iterators acquire both locks.
    94      *
    95      * Visibility between writers and readers is provided as follows:
    96      *
    97      * Whenever an element is enqueued, the putLock is acquired and
    98      * count updated.  A subsequent reader guarantees visibility to the
    99      * enqueued Node by either acquiring the putLock (via fullyLock)
   100      * or by acquiring the takeLock, and then reading n = count.get();
   101      * this gives visibility to the first n items.
   102      *
   103      * To implement weakly consistent iterators, it appears we need to
   104      * keep all Nodes GC-reachable from a predecessor dequeued Node.
   105      * That would cause two problems:
   106      * - allow a rogue Iterator to cause unbounded memory retention
   107      * - cause cross-generational linking of old Nodes to new Nodes if
   108      *   a Node was tenured while live, which generational GCs have a
   109      *   hard time dealing with, causing repeated major collections.
   110      * However, only non-deleted Nodes need to be reachable from
   111      * dequeued Nodes, and reachability does not necessarily have to
   112      * be of the kind understood by the GC.  We use the trick of
   113      * linking a Node that has just been dequeued to itself.  Such a
   114      * self-link implicitly means to advance to head.next.
   115      */
   116 
   117     /**
   118      * Linked list node class
   119      */
   120     static class Node<E> {
   121         E item;
   122 
   123         /**
   124          * One of:
   125          * - the real successor Node
   126          * - this Node, meaning the successor is head.next
   127          * - null, meaning there is no successor (this is the last node)
   128          */
   129         Node<E> next;
   130 
   131         Node(E x) { item = x; }
   132     }
   133 
   134     /** The capacity bound, or Integer.MAX_VALUE if none */
   135     private final int capacity;
   136 
   137     /** Current number of elements */
   138     private final AtomicInteger count = new AtomicInteger(0);
   139 
   140     /**
   141      * Head of linked list.
   142      * Invariant: head.item == null
   143      */
   144     private transient Node<E> head;
   145 
   146     /**
   147      * Tail of linked list.
   148      * Invariant: last.next == null
   149      */
   150     private transient Node<E> last;
   151 
   152     /** Lock held by take, poll, etc */
   153     private final ReentrantLock takeLock = new ReentrantLock();
   154 
   155     /** Wait queue for waiting takes */
   156     private final Condition notEmpty = takeLock.newCondition();
   157 
   158     /** Lock held by put, offer, etc */
   159     private final ReentrantLock putLock = new ReentrantLock();
   160 
   161     /** Wait queue for waiting puts */
   162     private final Condition notFull = putLock.newCondition();
   163 
   164     /**
   165      * Signals a waiting take. Called only from put/offer (which do not
   166      * otherwise ordinarily lock takeLock.)
   167      */
   168     private void signalNotEmpty() {
   169         final ReentrantLock takeLock = this.takeLock;
   170         takeLock.lock();
   171         try {
   172             notEmpty.signal();
   173         } finally {
   174             takeLock.unlock();
   175         }
   176     }
   177 
   178     /**
   179      * Signals a waiting put. Called only from take/poll.
   180      */
   181     private void signalNotFull() {
   182         final ReentrantLock putLock = this.putLock;
   183         putLock.lock();
   184         try {
   185             notFull.signal();
   186         } finally {
   187             putLock.unlock();
   188         }
   189     }
   190 
   191     /**
   192      * Links node at end of queue.
   193      *
   194      * @param node the node
   195      */
   196     private void enqueue(Node<E> node) {
   197         // assert putLock.isHeldByCurrentThread();
   198         // assert last.next == null;
   199         last = last.next = node;
   200     }
   201 
   202     /**
   203      * Removes a node from head of queue.
   204      *
   205      * @return the node
   206      */
   207     private E dequeue() {
   208         // assert takeLock.isHeldByCurrentThread();
   209         // assert head.item == null;
   210         Node<E> h = head;
   211         Node<E> first = h.next;
   212         h.next = h; // help GC
   213         head = first;
   214         E x = first.item;
   215         first.item = null;
   216         return x;
   217     }
   218 
   219     /**
   220      * Lock to prevent both puts and takes.
   221      */
   222     void fullyLock() {
   223         putLock.lock();
   224         takeLock.lock();
   225     }
   226 
   227     /**
   228      * Unlock to allow both puts and takes.
   229      */
   230     void fullyUnlock() {
   231         takeLock.unlock();
   232         putLock.unlock();
   233     }
   234 
   235 //     /**
   236 //      * Tells whether both locks are held by current thread.
   237 //      */
   238 //     boolean isFullyLocked() {
   239 //         return (putLock.isHeldByCurrentThread() &&
   240 //                 takeLock.isHeldByCurrentThread());
   241 //     }
   242 
   243     /**
   244      * Creates a {@code LinkedBlockingQueue} with a capacity of
   245      * {@link Integer#MAX_VALUE}.
   246      */
   247     public LinkedBlockingQueue() {
   248         this(Integer.MAX_VALUE);
   249     }
   250 
   251     /**
   252      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
   253      *
   254      * @param capacity the capacity of this queue
   255      * @throws IllegalArgumentException if {@code capacity} is not greater
   256      *         than zero
   257      */
   258     public LinkedBlockingQueue(int capacity) {
   259         if (capacity <= 0) throw new IllegalArgumentException();
   260         this.capacity = capacity;
   261         last = head = new Node<E>(null);
   262     }
   263 
   264     /**
   265      * Creates a {@code LinkedBlockingQueue} with a capacity of
   266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
   267      * given collection,
   268      * added in traversal order of the collection's iterator.
   269      *
   270      * @param c the collection of elements to initially contain
   271      * @throws NullPointerException if the specified collection or any
   272      *         of its elements are null
   273      */
   274     public LinkedBlockingQueue(Collection<? extends E> c) {
   275         this(Integer.MAX_VALUE);
   276         final ReentrantLock putLock = this.putLock;
   277         putLock.lock(); // Never contended, but necessary for visibility
   278         try {
   279             int n = 0;
   280             for (E e : c) {
   281                 if (e == null)
   282                     throw new NullPointerException();
   283                 if (n == capacity)
   284                     throw new IllegalStateException("Queue full");
   285                 enqueue(new Node<E>(e));
   286                 ++n;
   287             }
   288             count.set(n);
   289         } finally {
   290             putLock.unlock();
   291         }
   292     }
   293 
   294 
   295     // this doc comment is overridden to remove the reference to collections
   296     // greater in size than Integer.MAX_VALUE
   297     /**
   298      * Returns the number of elements in this queue.
   299      *
   300      * @return the number of elements in this queue
   301      */
   302     public int size() {
   303         return count.get();
   304     }
   305 
   306     // this doc comment is a modified copy of the inherited doc comment,
   307     // without the reference to unlimited queues.
   308     /**
   309      * Returns the number of additional elements that this queue can ideally
   310      * (in the absence of memory or resource constraints) accept without
   311      * blocking. This is always equal to the initial capacity of this queue
   312      * less the current {@code size} of this queue.
   313      *
   314      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
   315      * an element will succeed by inspecting {@code remainingCapacity}
   316      * because it may be the case that another thread is about to
   317      * insert or remove an element.
   318      */
   319     public int remainingCapacity() {
   320         return capacity - count.get();
   321     }
   322 
   323     /**
   324      * Inserts the specified element at the tail of this queue, waiting if
   325      * necessary for space to become available.
   326      *
   327      * @throws InterruptedException {@inheritDoc}
   328      * @throws NullPointerException {@inheritDoc}
   329      */
   330     public void put(E e) throws InterruptedException {
   331         if (e == null) throw new NullPointerException();
   332         // Note: convention in all put/take/etc is to preset local var
   333         // holding count negative to indicate failure unless set.
   334         int c = -1;
   335         Node<E> node = new Node(e);
   336         final ReentrantLock putLock = this.putLock;
   337         final AtomicInteger count = this.count;
   338         putLock.lockInterruptibly();
   339         try {
   340             /*
   341              * Note that count is used in wait guard even though it is
   342              * not protected by lock. This works because count can
   343              * only decrease at this point (all other puts are shut
   344              * out by lock), and we (or some other waiting put) are
   345              * signalled if it ever changes from capacity. Similarly
   346              * for all other uses of count in other wait guards.
   347              */
   348             while (count.get() == capacity) {
   349                 notFull.await();
   350             }
   351             enqueue(node);
   352             c = count.getAndIncrement();
   353             if (c + 1 < capacity)
   354                 notFull.signal();
   355         } finally {
   356             putLock.unlock();
   357         }
   358         if (c == 0)
   359             signalNotEmpty();
   360     }
   361 
   362     /**
   363      * Inserts the specified element at the tail of this queue, waiting if
   364      * necessary up to the specified wait time for space to become available.
   365      *
   366      * @return {@code true} if successful, or {@code false} if
   367      *         the specified waiting time elapses before space is available.
   368      * @throws InterruptedException {@inheritDoc}
   369      * @throws NullPointerException {@inheritDoc}
   370      */
   371     public boolean offer(E e, long timeout, TimeUnit unit)
   372         throws InterruptedException {
   373 
   374         if (e == null) throw new NullPointerException();
   375         long nanos = unit.toNanos(timeout);
   376         int c = -1;
   377         final ReentrantLock putLock = this.putLock;
   378         final AtomicInteger count = this.count;
   379         putLock.lockInterruptibly();
   380         try {
   381             while (count.get() == capacity) {
   382                 if (nanos <= 0)
   383                     return false;
   384                 nanos = notFull.awaitNanos(nanos);
   385             }
   386             enqueue(new Node<E>(e));
   387             c = count.getAndIncrement();
   388             if (c + 1 < capacity)
   389                 notFull.signal();
   390         } finally {
   391             putLock.unlock();
   392         }
   393         if (c == 0)
   394             signalNotEmpty();
   395         return true;
   396     }
   397 
   398     /**
   399      * Inserts the specified element at the tail of this queue if it is
   400      * possible to do so immediately without exceeding the queue's capacity,
   401      * returning {@code true} upon success and {@code false} if this queue
   402      * is full.
   403      * When using a capacity-restricted queue, this method is generally
   404      * preferable to method {@link BlockingQueue#add add}, which can fail to
   405      * insert an element only by throwing an exception.
   406      *
   407      * @throws NullPointerException if the specified element is null
   408      */
   409     public boolean offer(E e) {
   410         if (e == null) throw new NullPointerException();
   411         final AtomicInteger count = this.count;
   412         if (count.get() == capacity)
   413             return false;
   414         int c = -1;
   415         Node<E> node = new Node(e);
   416         final ReentrantLock putLock = this.putLock;
   417         putLock.lock();
   418         try {
   419             if (count.get() < capacity) {
   420                 enqueue(node);
   421                 c = count.getAndIncrement();
   422                 if (c + 1 < capacity)
   423                     notFull.signal();
   424             }
   425         } finally {
   426             putLock.unlock();
   427         }
   428         if (c == 0)
   429             signalNotEmpty();
   430         return c >= 0;
   431     }
   432 
   433 
   434     public E take() throws InterruptedException {
   435         E x;
   436         int c = -1;
   437         final AtomicInteger count = this.count;
   438         final ReentrantLock takeLock = this.takeLock;
   439         takeLock.lockInterruptibly();
   440         try {
   441             while (count.get() == 0) {
   442                 notEmpty.await();
   443             }
   444             x = dequeue();
   445             c = count.getAndDecrement();
   446             if (c > 1)
   447                 notEmpty.signal();
   448         } finally {
   449             takeLock.unlock();
   450         }
   451         if (c == capacity)
   452             signalNotFull();
   453         return x;
   454     }
   455 
   456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   457         E x = null;
   458         int c = -1;
   459         long nanos = unit.toNanos(timeout);
   460         final AtomicInteger count = this.count;
   461         final ReentrantLock takeLock = this.takeLock;
   462         takeLock.lockInterruptibly();
   463         try {
   464             while (count.get() == 0) {
   465                 if (nanos <= 0)
   466                     return null;
   467                 nanos = notEmpty.awaitNanos(nanos);
   468             }
   469             x = dequeue();
   470             c = count.getAndDecrement();
   471             if (c > 1)
   472                 notEmpty.signal();
   473         } finally {
   474             takeLock.unlock();
   475         }
   476         if (c == capacity)
   477             signalNotFull();
   478         return x;
   479     }
   480 
   481     public E poll() {
   482         final AtomicInteger count = this.count;
   483         if (count.get() == 0)
   484             return null;
   485         E x = null;
   486         int c = -1;
   487         final ReentrantLock takeLock = this.takeLock;
   488         takeLock.lock();
   489         try {
   490             if (count.get() > 0) {
   491                 x = dequeue();
   492                 c = count.getAndDecrement();
   493                 if (c > 1)
   494                     notEmpty.signal();
   495             }
   496         } finally {
   497             takeLock.unlock();
   498         }
   499         if (c == capacity)
   500             signalNotFull();
   501         return x;
   502     }
   503 
   504     public E peek() {
   505         if (count.get() == 0)
   506             return null;
   507         final ReentrantLock takeLock = this.takeLock;
   508         takeLock.lock();
   509         try {
   510             Node<E> first = head.next;
   511             if (first == null)
   512                 return null;
   513             else
   514                 return first.item;
   515         } finally {
   516             takeLock.unlock();
   517         }
   518     }
   519 
   520     /**
   521      * Unlinks interior Node p with predecessor trail.
   522      */
   523     void unlink(Node<E> p, Node<E> trail) {
   524         // assert isFullyLocked();
   525         // p.next is not changed, to allow iterators that are
   526         // traversing p to maintain their weak-consistency guarantee.
   527         p.item = null;
   528         trail.next = p.next;
   529         if (last == p)
   530             last = trail;
   531         if (count.getAndDecrement() == capacity)
   532             notFull.signal();
   533     }
   534 
   535     /**
   536      * Removes a single instance of the specified element from this queue,
   537      * if it is present.  More formally, removes an element {@code e} such
   538      * that {@code o.equals(e)}, if this queue contains one or more such
   539      * elements.
   540      * Returns {@code true} if this queue contained the specified element
   541      * (or equivalently, if this queue changed as a result of the call).
   542      *
   543      * @param o element to be removed from this queue, if present
   544      * @return {@code true} if this queue changed as a result of the call
   545      */
   546     public boolean remove(Object o) {
   547         if (o == null) return false;
   548         fullyLock();
   549         try {
   550             for (Node<E> trail = head, p = trail.next;
   551                  p != null;
   552                  trail = p, p = p.next) {
   553                 if (o.equals(p.item)) {
   554                     unlink(p, trail);
   555                     return true;
   556                 }
   557             }
   558             return false;
   559         } finally {
   560             fullyUnlock();
   561         }
   562     }
   563 
   564     /**
   565      * Returns {@code true} if this queue contains the specified element.
   566      * More formally, returns {@code true} if and only if this queue contains
   567      * at least one element {@code e} such that {@code o.equals(e)}.
   568      *
   569      * @param o object to be checked for containment in this queue
   570      * @return {@code true} if this queue contains the specified element
   571      */
   572     public boolean contains(Object o) {
   573         if (o == null) return false;
   574         fullyLock();
   575         try {
   576             for (Node<E> p = head.next; p != null; p = p.next)
   577                 if (o.equals(p.item))
   578                     return true;
   579             return false;
   580         } finally {
   581             fullyUnlock();
   582         }
   583     }
   584 
   585     /**
   586      * Returns an array containing all of the elements in this queue, in
   587      * proper sequence.
   588      *
   589      * <p>The returned array will be "safe" in that no references to it are
   590      * maintained by this queue.  (In other words, this method must allocate
   591      * a new array).  The caller is thus free to modify the returned array.
   592      *
   593      * <p>This method acts as bridge between array-based and collection-based
   594      * APIs.
   595      *
   596      * @return an array containing all of the elements in this queue
   597      */
   598     public Object[] toArray() {
   599         fullyLock();
   600         try {
   601             int size = count.get();
   602             Object[] a = new Object[size];
   603             int k = 0;
   604             for (Node<E> p = head.next; p != null; p = p.next)
   605                 a[k++] = p.item;
   606             return a;
   607         } finally {
   608             fullyUnlock();
   609         }
   610     }
   611 
   612     /**
   613      * Returns an array containing all of the elements in this queue, in
   614      * proper sequence; the runtime type of the returned array is that of
   615      * the specified array.  If the queue fits in the specified array, it
   616      * is returned therein.  Otherwise, a new array is allocated with the
   617      * runtime type of the specified array and the size of this queue.
   618      *
   619      * <p>If this queue fits in the specified array with room to spare
   620      * (i.e., the array has more elements than this queue), the element in
   621      * the array immediately following the end of the queue is set to
   622      * {@code null}.
   623      *
   624      * <p>Like the {@link #toArray()} method, this method acts as bridge between
   625      * array-based and collection-based APIs.  Further, this method allows
   626      * precise control over the runtime type of the output array, and may,
   627      * under certain circumstances, be used to save allocation costs.
   628      *
   629      * <p>Suppose {@code x} is a queue known to contain only strings.
   630      * The following code can be used to dump the queue into a newly
   631      * allocated array of {@code String}:
   632      *
   633      * <pre>
   634      *     String[] y = x.toArray(new String[0]);</pre>
   635      *
   636      * Note that {@code toArray(new Object[0])} is identical in function to
   637      * {@code toArray()}.
   638      *
   639      * @param a the array into which the elements of the queue are to
   640      *          be stored, if it is big enough; otherwise, a new array of the
   641      *          same runtime type is allocated for this purpose
   642      * @return an array containing all of the elements in this queue
   643      * @throws ArrayStoreException if the runtime type of the specified array
   644      *         is not a supertype of the runtime type of every element in
   645      *         this queue
   646      * @throws NullPointerException if the specified array is null
   647      */
   648     @SuppressWarnings("unchecked")
   649     public <T> T[] toArray(T[] a) {
   650         fullyLock();
   651         try {
   652             int size = count.get();
   653             if (a.length < size)
   654                 a = (T[])java.lang.reflect.Array.newInstance
   655                     (a.getClass().getComponentType(), size);
   656 
   657             int k = 0;
   658             for (Node<E> p = head.next; p != null; p = p.next)
   659                 a[k++] = (T)p.item;
   660             if (a.length > k)
   661                 a[k] = null;
   662             return a;
   663         } finally {
   664             fullyUnlock();
   665         }
   666     }
   667 
   668     public String toString() {
   669         fullyLock();
   670         try {
   671             Node<E> p = head.next;
   672             if (p == null)
   673                 return "[]";
   674 
   675             StringBuilder sb = new StringBuilder();
   676             sb.append('[');
   677             for (;;) {
   678                 E e = p.item;
   679                 sb.append(e == this ? "(this Collection)" : e);
   680                 p = p.next;
   681                 if (p == null)
   682                     return sb.append(']').toString();
   683                 sb.append(',').append(' ');
   684             }
   685         } finally {
   686             fullyUnlock();
   687         }
   688     }
   689 
   690     /**
   691      * Atomically removes all of the elements from this queue.
   692      * The queue will be empty after this call returns.
   693      */
   694     public void clear() {
   695         fullyLock();
   696         try {
   697             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
   698                 h.next = h;
   699                 p.item = null;
   700             }
   701             head = last;
   702             // assert head.item == null && head.next == null;
   703             if (count.getAndSet(0) == capacity)
   704                 notFull.signal();
   705         } finally {
   706             fullyUnlock();
   707         }
   708     }
   709 
   710     /**
   711      * @throws UnsupportedOperationException {@inheritDoc}
   712      * @throws ClassCastException            {@inheritDoc}
   713      * @throws NullPointerException          {@inheritDoc}
   714      * @throws IllegalArgumentException      {@inheritDoc}
   715      */
   716     public int drainTo(Collection<? super E> c) {
   717         return drainTo(c, Integer.MAX_VALUE);
   718     }
   719 
   720     /**
   721      * @throws UnsupportedOperationException {@inheritDoc}
   722      * @throws ClassCastException            {@inheritDoc}
   723      * @throws NullPointerException          {@inheritDoc}
   724      * @throws IllegalArgumentException      {@inheritDoc}
   725      */
   726     public int drainTo(Collection<? super E> c, int maxElements) {
   727         if (c == null)
   728             throw new NullPointerException();
   729         if (c == this)
   730             throw new IllegalArgumentException();
   731         boolean signalNotFull = false;
   732         final ReentrantLock takeLock = this.takeLock;
   733         takeLock.lock();
   734         try {
   735             int n = Math.min(maxElements, count.get());
   736             // count.get provides visibility to first n Nodes
   737             Node<E> h = head;
   738             int i = 0;
   739             try {
   740                 while (i < n) {
   741                     Node<E> p = h.next;
   742                     c.add(p.item);
   743                     p.item = null;
   744                     h.next = h;
   745                     h = p;
   746                     ++i;
   747                 }
   748                 return n;
   749             } finally {
   750                 // Restore invariants even if c.add() threw
   751                 if (i > 0) {
   752                     // assert h.item == null;
   753                     head = h;
   754                     signalNotFull = (count.getAndAdd(-i) == capacity);
   755                 }
   756             }
   757         } finally {
   758             takeLock.unlock();
   759             if (signalNotFull)
   760                 signalNotFull();
   761         }
   762     }
   763 
   764     /**
   765      * Returns an iterator over the elements in this queue in proper sequence.
   766      * The elements will be returned in order from first (head) to last (tail).
   767      *
   768      * <p>The returned iterator is a "weakly consistent" iterator that
   769      * will never throw {@link java.util.ConcurrentModificationException
   770      * ConcurrentModificationException}, and guarantees to traverse
   771      * elements as they existed upon construction of the iterator, and
   772      * may (but is not guaranteed to) reflect any modifications
   773      * subsequent to construction.
   774      *
   775      * @return an iterator over the elements in this queue in proper sequence
   776      */
   777     public Iterator<E> iterator() {
   778       return new Itr();
   779     }
   780 
   781     private class Itr implements Iterator<E> {
   782         /*
   783          * Basic weakly-consistent iterator.  At all times hold the next
   784          * item to hand out so that if hasNext() reports true, we will
   785          * still have it to return even if lost race with a take etc.
   786          */
   787         private Node<E> current;
   788         private Node<E> lastRet;
   789         private E currentElement;
   790 
   791         Itr() {
   792             fullyLock();
   793             try {
   794                 current = head.next;
   795                 if (current != null)
   796                     currentElement = current.item;
   797             } finally {
   798                 fullyUnlock();
   799             }
   800         }
   801 
   802         public boolean hasNext() {
   803             return current != null;
   804         }
   805 
   806         /**
   807          * Returns the next live successor of p, or null if no such.
   808          *
   809          * Unlike other traversal methods, iterators need to handle both:
   810          * - dequeued nodes (p.next == p)
   811          * - (possibly multiple) interior removed nodes (p.item == null)
   812          */
   813         private Node<E> nextNode(Node<E> p) {
   814             for (;;) {
   815                 Node<E> s = p.next;
   816                 if (s == p)
   817                     return head.next;
   818                 if (s == null || s.item != null)
   819                     return s;
   820                 p = s;
   821             }
   822         }
   823 
   824         public E next() {
   825             fullyLock();
   826             try {
   827                 if (current == null)
   828                     throw new NoSuchElementException();
   829                 E x = currentElement;
   830                 lastRet = current;
   831                 current = nextNode(current);
   832                 currentElement = (current == null) ? null : current.item;
   833                 return x;
   834             } finally {
   835                 fullyUnlock();
   836             }
   837         }
   838 
   839         public void remove() {
   840             if (lastRet == null)
   841                 throw new IllegalStateException();
   842             fullyLock();
   843             try {
   844                 Node<E> node = lastRet;
   845                 lastRet = null;
   846                 for (Node<E> trail = head, p = trail.next;
   847                      p != null;
   848                      trail = p, p = p.next) {
   849                     if (p == node) {
   850                         unlink(p, trail);
   851                         break;
   852                     }
   853                 }
   854             } finally {
   855                 fullyUnlock();
   856             }
   857         }
   858     }
   859 
   860     /**
   861      * Save the state to a stream (that is, serialize it).
   862      *
   863      * @serialData The capacity is emitted (int), followed by all of
   864      * its elements (each an {@code Object}) in the proper order,
   865      * followed by a null
   866      * @param s the stream
   867      */
   868     private void writeObject(java.io.ObjectOutputStream s)
   869         throws java.io.IOException {
   870 
   871         fullyLock();
   872         try {
   873             // Write out any hidden stuff, plus capacity
   874             s.defaultWriteObject();
   875 
   876             // Write out all elements in the proper order.
   877             for (Node<E> p = head.next; p != null; p = p.next)
   878                 s.writeObject(p.item);
   879 
   880             // Use trailing null as sentinel
   881             s.writeObject(null);
   882         } finally {
   883             fullyUnlock();
   884         }
   885     }
   886 
   887     /**
   888      * Reconstitute this queue instance from a stream (that is,
   889      * deserialize it).
   890      *
   891      * @param s the stream
   892      */
   893     private void readObject(java.io.ObjectInputStream s)
   894         throws java.io.IOException, ClassNotFoundException {
   895         // Read in capacity, and any hidden stuff
   896         s.defaultReadObject();
   897 
   898         count.set(0);
   899         last = head = new Node<E>(null);
   900 
   901         // Read in all elements and place in queue
   902         for (;;) {
   903             @SuppressWarnings("unchecked")
   904             E item = (E)s.readObject();
   905             if (item == null)
   906                 break;
   907             add(item);
   908         }
   909     }
   910 }